Skip to content

feat(server): implement Redis-based event queue manager #269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

RobinQu
Copy link

@RobinQu RobinQu commented Jul 2, 2025

  • Add RedisQueueManager class to handle event queues using Redis
  • Implement core functionalities: add, get, tap, close, create_or_tap
  • Add unit tests for RedisQueueManager
  • Update .gitignore to exclude spec.json and .idea
  • Refactor event type definition in event_queue.py
  • Add fakeredis dependency in pyproject.toml

Description

Thank you for opening a Pull Request!
Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Follow the CONTRIBUTING Guide.
  • Make your Pull Request title in the https://www.conventionalcommits.org/ specification.
    • Important Prefixes for release-please:
      • fix: which represents bug fixes, and correlates to a SemVer patch.
      • feat: represents a new feature, and correlates to a SemVer minor.
      • feat!:, or fix!:, refactor!:, etc., which represent a breaking change (indicated by the !) and will result in a SemVer major.
  • Ensure the tests and linter pass (Run nox -s format from the repository root to format)
  • Appropriate docs were updated (if necessary)

Related to a2aproject/A2A#674 (comment)

long.qul added 2 commits July 2, 2025 17:38
- Add RedisQueueManager class to handle event queues using Redis
- Implement core functionalities: add, get, tap, close, create_or_tap
- Add unit tests for RedisQueueManager
- Update .gitignore to exclude spec.json and .idea
- Refactor event type definition in event_queue.py
- Add fakeredis dependency in pyproject.toml
@RobinQu RobinQu requested a review from a team as a code owner July 2, 2025 14:47
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @RobinQu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant architectural enhancement by integrating Redis as a backend for the server's event queue management. This new Redis-based manager facilitates robust and scalable event handling, enabling efficient communication and state synchronization across different processes or services. The changes also include necessary dependency updates and type definition refinements to support this new distributed eventing capability.

Highlights

  • New Feature: Redis Event Queue Manager: Implemented the RedisQueueManager class, which provides a Redis-backed solution for managing event queues. This enables cross-process event relay using Redis Pub/Sub.
  • Core Event Queue Functionality: The RedisQueueManager includes core methods for handling event queues: add to register a new queue, get to retrieve an existing queue, tap to create a new consumer for a queue, close to shut down a queue, and create_or_tap for convenient queue management.
  • Event Type Definition Refinement: Refactored the Event type alias in event_queue.py to utilize Pydantic's Annotated and Field(discriminator="kind"). This improves the handling and serialization/deserialization of polymorphic event types.
  • Testing and Dependencies: Comprehensive unit tests for the RedisQueueManager have been added, leveraging fakeredis to simulate Redis interactions without requiring a live server. The fakeredis dependency was also added to pyproject.toml.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a RedisQueueManager for handling event queues with Redis. There are a few high-severity issues related to race conditions and resource leaks that need to be addressed. Additionally, the test coverage should be expanded to include the new pub/sub and proxy queue functionality to ensure its correctness.

@RobinQu
Copy link
Author

RobinQu commented Jul 2, 2025

@pstephengoogle I will fix failed checks later. Let's dicuss implmentation detail first.

@pstephengoogle
Copy link
Contributor

This aligns with how I thought this could be achieved. Do you feel this addresses all your requirements?

@RobinQu
Copy link
Author

RobinQu commented Jul 3, 2025

This aligns with how I thought this could be achieved. Do you feel this addresses all your requirements?

This is enough for a MVP version. Some TTL housekeep for Redis keys should be added later.

Please give me some help about your unit tests. It seems that your present code fails those tests.

long.qul added 7 commits July 3, 2025 18:10
- Add RedisQueueManager class to manage event queues across distributed services
- Implement local and proxy queue management using Redis pub/sub
- Add logging for better visibility and debugging
- Update tests to cover new functionality
- Replace Redis set with sorted set to store task IDs with timestamp
- Add TTL update mechanism for active task IDs
- Implement periodic cleanup of expired task IDs
- Update task registration and removal to use new sorted set structure
…ions

- Add type annotation for parsed event in RedisQueueManager
- Update test assertions to check tap call count in queue manager tests
- Enhance EventQueue to handle shutdown process more efficiently- Optimize RedisQueueManager for better task event handling
- Add comprehensive test cases for queue management scenarios
- Add TTL (Time To Live) for task IDs in the global registry
- Implement node affiliation for task IDs to prevent message broadcasting to outdated task queues
- Update task registration and message relaying logic to support the new features
- Add test cases for task ID expiration and node affiliation
…bability_to_clean_expired' parameter

- Update test_integration.py to reflect changes in RedisQueueManager
@RobinQu
Copy link
Author

RobinQu commented Jul 4, 2025

This aligns with how I thought this could be achieved. Do you feel this addresses all your requirements?

This is enough for a MVP version. Some TTL housekeep for Redis keys should be added later.

Please give me some help about your unit tests. It seems that your present code fails those tests.

Never mind. I figured the problem. Hope you guys give some comments and finish this PR.

pyproject.toml Outdated
@@ -21,6 +21,8 @@ dependencies = [
"grpcio-tools>=1.60",
"grpcio_reflection>=1.7.0",
"protobuf==5.29.5",
"fakeredis>=2.30.1",
"redis>=6.2.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO redis should be optional dependency. Also, you don't need to specify fakeredis here, since you already added it to dev group.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

BTW, should I update uv.lock file as well? After uv sync, uv.lock is changed a lot. Or you will do the file update in your release pipeline?

@lukehinds
Copy link
Contributor

lukehinds commented Jul 4, 2025

Should this not be an abstract interface for any queue / event manager to implement with, and we decouple this by defining a generic queue/event‐manager interface rather than embedding a Redis‐specific implementation directly into the library.

Embedding a single vendor’s system (one that recently caused some drama around changing their license) risks locking users into that choice and makes it harder to support alternative backends down the line. Instead, let’s introduce an abstract interface (e.g. AbstractQueue or AbstractEventManager) so that anyone can plug in Redis, Valkey, RabbitMQ, AWS SQS, or any other system they prefer. This will keep our core library lean, flexible, and truly pluggable.

- Remove fakeredis from dependencies
- Make redis optional by moving it to the 'redis' extra
- Update RedisQueueManager to handle optional node_id
- Improve error handling and logging in RedisQueueManager
@RobinQu
Copy link
Author

RobinQu commented Jul 5, 2025

Should this not be an abstract interface for any queue / event manager to implement with, and we decouple this by defining a generic queue/event‐manager interface rather than embedding a Redis‐specific implementation directly into the library.

Embedding a single vendor’s system (one that recently caused some drama around changing their license) risks locking users into that choice and makes it harder to support alternative backends down the line. Instead, let’s introduce an abstract interface (e.g. AbstractQueue or AbstractEventManager) so that anyone can plug in Redis, Valkey, RabbitMQ, AWS SQS, or any other system they prefer. This will keep our core library lean, flexible, and truly pluggable.

I raised the same opnion in a2aproject/A2A#674 (comment)

I think some major refactoring is needed in order to truely make the queue mechanism pluggable. AFAIK, this libraray has already contains some vendor specific code like DatabaseTaskStore which supports mysql, pgsql and sqlite as TaskStore storage.

As a library user, I would like this library contains out-of-box implementations of necessary components, like QueueManager and TaskStore, as as well as proper abstractions to allow me to have my private toys. And these won't conflict with each other.

In terms of QueueManager, I am surprised to see there is no single implementation that supports distributed senario, and I think RedisQeuueManager would be a mininum component to demonstrate how you would handle multi processes or multi nodes in a horizontally scalable deplyoment. And to be honest, this implemntation is sufficient for small to medium bussiness, covering needs of most of library users.

@lukehinds
Copy link
Contributor

lukehinds commented Jul 5, 2025

The more I think about this, the less inclined I am to have us set this as a precedence, if we have already done so with Databases, I don't think that's a good enough reason to double down and add more opinionated implementations.

As this is a Linux Foundation project, we should attempt to provide interfaces available to all projects to use, otherwise we may (and very likely will) end up carrying maintenance in tree of a many vendor solutions (some of which will fall out of maintenance). Trust me I have been there with Kubernetes, openstack and other such projects.

For that reason I really don't think this is a good idea if we can avoid it, instead we should provide an agnostic / abstract base interfaces and then we can have /examples showing how to use with any given external project, just as we have for frameworks (langchain, crewAI etc).

Copy link
Contributor

@lukehinds lukehinds left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just adding a request changes, as i think this needs discussion first.

Copy link
Contributor

@ognis1205 ognis1205 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @lukehinds — and I think the same kind of discussion should apply to DatabaseTaskStore as well.

Given the protocol-oriented nature of this project, especially under the Linux Foundation, we should aim to keep server-side components as abstract and vendor-neutral as possible. In fact, it might be more appropriate to treat such implementations as reference implementations rather than embedding them in the core.

@RobinQu
Copy link
Author

RobinQu commented Jul 7, 2025

@lukehinds How will the discussion be initiated and who should be invited? Is there anyone who is actually pushing forward this issue?

@pstephengoogle
Copy link
Contributor

pstephengoogle commented Jul 7, 2025

Sorry was OOO for the long weekend. To clarify a few things:

  • The DatabaseTaskStore is not tied to a specific vendor. It uses sqlalchemy library which allows for many DB implementations to be directly used.
  • I agree we should not have a vendor specific implementation in the SDK. The pattern we are following is to define the right interfaces/logic in the SDK and show a proof of concept with a specific vendor solution in the samples repo. This way you can verify your abstractions are valid for at least one vendor, and hopefully others can implement different versions for different vendors until the SDK abstractions are correct.

To that end, I think it makes sense to commit an ProducerConsumerQueueManager or similar which provides the right logic and abstractions around using an distributed producer/consumer paradigm with the QueueManager interface. Then you can implement the redis based producer/consumer instance (in the samples repo) to prove the end-to-end behavior

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants