-
Notifications
You must be signed in to change notification settings - Fork 160
feat(server): implement Redis-based event queue manager #269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Add RedisQueueManager class to handle event queues using Redis - Implement core functionalities: add, get, tap, close, create_or_tap - Add unit tests for RedisQueueManager - Update .gitignore to exclude spec.json and .idea - Refactor event type definition in event_queue.py - Add fakeredis dependency in pyproject.toml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @RobinQu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces a significant architectural enhancement by integrating Redis as a backend for the server's event queue management. This new Redis-based manager facilitates robust and scalable event handling, enabling efficient communication and state synchronization across different processes or services. The changes also include necessary dependency updates and type definition refinements to support this new distributed eventing capability.
Highlights
- New Feature: Redis Event Queue Manager: Implemented the
RedisQueueManager
class, which provides a Redis-backed solution for managing event queues. This enables cross-process event relay using Redis Pub/Sub. - Core Event Queue Functionality: The
RedisQueueManager
includes core methods for handling event queues:add
to register a new queue,get
to retrieve an existing queue,tap
to create a new consumer for a queue,close
to shut down a queue, andcreate_or_tap
for convenient queue management. - Event Type Definition Refinement: Refactored the
Event
type alias inevent_queue.py
to utilize Pydantic'sAnnotated
andField(discriminator="kind")
. This improves the handling and serialization/deserialization of polymorphic event types. - Testing and Dependencies: Comprehensive unit tests for the
RedisQueueManager
have been added, leveragingfakeredis
to simulate Redis interactions without requiring a live server. Thefakeredis
dependency was also added topyproject.toml
.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a RedisQueueManager
for handling event queues with Redis. There are a few high-severity issues related to race conditions and resource leaks that need to be addressed. Additionally, the test coverage should be expanded to include the new pub/sub and proxy queue functionality to ensure its correctness.
@pstephengoogle I will fix failed checks later. Let's dicuss implmentation detail first. |
This aligns with how I thought this could be achieved. Do you feel this addresses all your requirements? |
This is enough for a MVP version. Some TTL housekeep for Redis keys should be added later. Please give me some help about your unit tests. It seems that your present code fails those tests. |
- Add RedisQueueManager class to manage event queues across distributed services - Implement local and proxy queue management using Redis pub/sub - Add logging for better visibility and debugging - Update tests to cover new functionality
- Replace Redis set with sorted set to store task IDs with timestamp - Add TTL update mechanism for active task IDs - Implement periodic cleanup of expired task IDs - Update task registration and removal to use new sorted set structure
…ions - Add type annotation for parsed event in RedisQueueManager - Update test assertions to check tap call count in queue manager tests
- Enhance EventQueue to handle shutdown process more efficiently- Optimize RedisQueueManager for better task event handling - Add comprehensive test cases for queue management scenarios
- Add TTL (Time To Live) for task IDs in the global registry - Implement node affiliation for task IDs to prevent message broadcasting to outdated task queues - Update task registration and message relaying logic to support the new features - Add test cases for task ID expiration and node affiliation
…bability_to_clean_expired' parameter - Update test_integration.py to reflect changes in RedisQueueManager
Never mind. I figured the problem. Hope you guys give some comments and finish this PR. |
pyproject.toml
Outdated
@@ -21,6 +21,8 @@ dependencies = [ | |||
"grpcio-tools>=1.60", | |||
"grpcio_reflection>=1.7.0", | |||
"protobuf==5.29.5", | |||
"fakeredis>=2.30.1", | |||
"redis>=6.2.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO redis
should be optional dependency. Also, you don't need to specify fakeredis
here, since you already added it to dev
group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
BTW, should I update uv.lock file as well? After uv sync
, uv.lock is changed a lot. Or you will do the file update in your release pipeline?
Should this not be an abstract interface for any queue / event manager to implement with, and we decouple this by defining a generic queue/event‐manager interface rather than embedding a Redis‐specific implementation directly into the library. Embedding a single vendor’s system (one that recently caused some drama around changing their license) risks locking users into that choice and makes it harder to support alternative backends down the line. Instead, let’s introduce an abstract interface (e.g. AbstractQueue or AbstractEventManager) so that anyone can plug in Redis, Valkey, RabbitMQ, AWS SQS, or any other system they prefer. This will keep our core library lean, flexible, and truly pluggable. |
- Remove fakeredis from dependencies - Make redis optional by moving it to the 'redis' extra - Update RedisQueueManager to handle optional node_id - Improve error handling and logging in RedisQueueManager
I raised the same opnion in a2aproject/A2A#674 (comment) I think some major refactoring is needed in order to truely make the queue mechanism pluggable. AFAIK, this libraray has already contains some vendor specific code like As a library user, I would like this library contains out-of-box implementations of necessary components, like In terms of |
The more I think about this, the less inclined I am to have us set this as a precedence, if we have already done so with Databases, I don't think that's a good enough reason to double down and add more opinionated implementations. As this is a Linux Foundation project, we should attempt to provide interfaces available to all projects to use, otherwise we may (and very likely will) end up carrying maintenance in tree of a many vendor solutions (some of which will fall out of maintenance). Trust me I have been there with Kubernetes, openstack and other such projects. For that reason I really don't think this is a good idea if we can avoid it, instead we should provide an agnostic / abstract base interfaces and then we can have /examples showing how to use with any given external project, just as we have for frameworks (langchain, crewAI etc). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just adding a request changes, as i think this needs discussion first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @lukehinds — and I think the same kind of discussion should apply to DatabaseTaskStore
as well.
Given the protocol-oriented nature of this project, especially under the Linux Foundation, we should aim to keep server-side components as abstract and vendor-neutral as possible. In fact, it might be more appropriate to treat such implementations as reference implementations rather than embedding them in the core.
@lukehinds How will the discussion be initiated and who should be invited? Is there anyone who is actually pushing forward this issue? |
Sorry was OOO for the long weekend. To clarify a few things:
To that end, I think it makes sense to commit an |
Description
Thank you for opening a Pull Request!
Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
CONTRIBUTING
Guide.fix:
which represents bug fixes, and correlates to a SemVer patch.feat:
represents a new feature, and correlates to a SemVer minor.feat!:
, orfix!:
,refactor!:
, etc., which represent a breaking change (indicated by the!
) and will result in a SemVer major.nox -s format
from the repository root to format)Related to a2aproject/A2A#674 (comment)