Skip to content

Commit e5d99ee

Browse files
authored
feat: Support for Database based Push Config Store (#299)
# Description Introduces support for Database backed Push Notification Config Store. Push Notification Config persisted to the database table gets encrypted when encryption_key param is set for the DatabasePushNotificationConfigStore. Usage: ```py httpx_client = httpx.AsyncClient() engine = create_async_engine( "sqlite+aiosqlite:///file::memory:?cache=shared", echo=False ) push_config_store = DatabasePushNotificationConfigStore(engine=engine, encryption_key=Fernet.generate_key()) push_sender = BasePushNotificationSender( httpx_client=httpx_client, config_store=push_config_store) request_handler = DefaultRequestHandler( agent_executor=CurrencyAgentExecutor(), task_store=DatabaseTaskStore(engine=engine), push_config_store=push_config_store, push_sender=push_sender) ```
1 parent dc95e2a commit e5d99ee

File tree

11 files changed

+1081
-22
lines changed

11 files changed

+1081
-22
lines changed

.github/actions/spelling/allow.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ drivername
3838
dunders
3939
euo
4040
excinfo
41+
fernet
4142
fetchrow
4243
fetchval
4344
genai

.github/workflows/unit-tests.yml

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,24 @@ jobs:
1515
postgres:
1616
image: postgres:15-alpine
1717
env:
18-
POSTGRES_USER: postgres
19-
POSTGRES_PASSWORD: postgres
18+
POSTGRES_USER: a2a
19+
POSTGRES_PASSWORD: a2a_password
2020
POSTGRES_DB: a2a_test
2121
ports:
2222
- 5432:5432
23+
options: >-
24+
--health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
25+
mysql:
26+
image: mysql:8.0
27+
env:
28+
MYSQL_ROOT_PASSWORD: root
29+
MYSQL_DATABASE: a2a_test
30+
MYSQL_USER: a2a
31+
MYSQL_PASSWORD: a2a_password
32+
ports:
33+
- 3306:3306
34+
options: >-
35+
--health-cmd="mysqladmin ping -h localhost -u root -proot" --health-interval=10s --health-timeout=5s --health-retries=5
2336
2437
strategy:
2538
matrix:
@@ -31,19 +44,18 @@ jobs:
3144
uses: actions/setup-python@v5
3245
with:
3346
python-version: ${{ matrix.python-version }}
34-
- name: Set postgres for tests
35-
run: |
36-
sudo apt-get update && sudo apt-get install -y postgresql-client
37-
PGPASSWORD=postgres psql -h localhost -p 5432 -U postgres -d a2a_test -f ${{ github.workspace }}/tests/docker/postgres/init.sql
38-
export POSTGRES_TEST_DSN="postgresql+asyncpg://postgres:postgres@localhost:5432/a2a_test"
47+
- name: Set up test environment variables
48+
run: |
49+
echo "POSTGRES_TEST_DSN=postgresql+asyncpg://a2a:a2a_password@localhost:5432/a2a_test" >> $GITHUB_ENV
50+
echo "MYSQL_TEST_DSN=mysql+aiomysql://a2a:a2a_password@localhost:3306/a2a_test" >> $GITHUB_ENV
3951
4052
- name: Install uv
4153
uses: astral-sh/setup-uv@v6
4254
- name: Add uv to PATH
4355
run: |
4456
echo "$HOME/.cargo/bin" >> $GITHUB_PATH
4557
- name: Install dependencies
46-
run: uv sync --dev --extra sql
58+
run: uv sync --dev --extra sql --extra encryption
4759
- name: Run tests and check coverage
4860
run: uv run pytest --cov=a2a --cov-report=xml --cov-fail-under=89
4961
- name: Show coverage summary in log

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ postgresql = ["sqlalchemy[asyncio,postgresql-asyncpg]>=2.0.0"]
4141
mysql = ["sqlalchemy[asyncio,aiomysql]>=2.0.0"]
4242
sqlite = ["sqlalchemy[asyncio,aiosqlite]>=2.0.0"]
4343
sql = ["sqlalchemy[asyncio,postgresql-asyncpg,aiomysql,aiosqlite]>=2.0.0"]
44+
encryption = ["cryptography>=43.0.0"]
4445

4546
[project.urls]
4647
homepage = "https://a2aproject.github.io/A2A/"

src/a2a/server/models.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def override(func): # noqa: ANN001, ANN201
1616

1717

1818
try:
19-
from sqlalchemy import JSON, Dialect, String
19+
from sqlalchemy import JSON, Dialect, LargeBinary, String
2020
from sqlalchemy.orm import (
2121
DeclarativeBase,
2222
Mapped,
@@ -208,3 +208,58 @@ class TaskModel(TaskMixin, Base):
208208
"""Default task model with standard table name."""
209209

210210
__tablename__ = 'tasks'
211+
212+
213+
# PushNotificationConfigMixin that can be used with any table name
214+
class PushNotificationConfigMixin:
215+
"""Mixin providing standard push notification config columns."""
216+
217+
task_id: Mapped[str] = mapped_column(String(36), primary_key=True)
218+
config_id: Mapped[str] = mapped_column(String(255), primary_key=True)
219+
config_data: Mapped[bytes] = mapped_column(LargeBinary, nullable=False)
220+
221+
@override
222+
def __repr__(self) -> str:
223+
"""Return a string representation of the push notification config."""
224+
repr_template = '<{CLS}(task_id="{TID}", config_id="{CID}")>'
225+
return repr_template.format(
226+
CLS=self.__class__.__name__,
227+
TID=self.task_id,
228+
CID=self.config_id,
229+
)
230+
231+
232+
def create_push_notification_config_model(
233+
table_name: str = 'push_notification_configs',
234+
base: type[DeclarativeBase] = Base,
235+
) -> type:
236+
"""Create a PushNotificationConfigModel class with a configurable table name."""
237+
238+
class PushNotificationConfigModel(PushNotificationConfigMixin, base):
239+
__tablename__ = table_name
240+
241+
@override
242+
def __repr__(self) -> str:
243+
"""Return a string representation of the push notification config."""
244+
repr_template = '<PushNotificationConfigModel[{TABLE}](task_id="{TID}", config_id="{CID}")>'
245+
return repr_template.format(
246+
TABLE=table_name,
247+
TID=self.task_id,
248+
CID=self.config_id,
249+
)
250+
251+
PushNotificationConfigModel.__name__ = (
252+
f'PushNotificationConfigModel_{table_name}'
253+
)
254+
PushNotificationConfigModel.__qualname__ = (
255+
f'PushNotificationConfigModel_{table_name}'
256+
)
257+
258+
return PushNotificationConfigModel
259+
260+
261+
# Default PushNotificationConfigModel for backward compatibility
262+
class PushNotificationConfigModel(PushNotificationConfigMixin, Base):
263+
"""Default push notification config model with standard table name."""
264+
265+
__tablename__ = 'push_notification_configs'

src/a2a/server/tasks/__init__.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,31 @@ def __init__(self, *args, **kwargs):
4343
) from _original_error
4444

4545

46+
try:
47+
from a2a.server.tasks.database_push_notification_config_store import (
48+
DatabasePushNotificationConfigStore, # type: ignore
49+
)
50+
except ImportError as e:
51+
_original_error = e
52+
# If the database push notification config store is not available, we can still use in-memory stores.
53+
logger.debug(
54+
'DatabasePushNotificationConfigStore not loaded. This is expected if database dependencies are not installed. Error: %s',
55+
e,
56+
)
57+
58+
class DatabasePushNotificationConfigStore: # type: ignore
59+
"""Placeholder for DatabasePushNotificationConfigStore when dependencies are not installed."""
60+
61+
def __init__(self, *args, **kwargs):
62+
raise ImportError(
63+
'To use DatabasePushNotificationConfigStore, its dependencies must be installed. '
64+
'You can install them with \'pip install "a2a-sdk[sql]"\''
65+
) from _original_error
66+
67+
4668
__all__ = [
4769
'BasePushNotificationSender',
70+
'DatabasePushNotificationConfigStore',
4871
'DatabaseTaskStore',
4972
'InMemoryPushNotificationConfigStore',
5073
'InMemoryTaskStore',

src/a2a/server/tasks/base_push_notification_sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ async def _dispatch_notification(
5858
response = await self._client.post(
5959
url,
6060
json=task.model_dump(mode='json', exclude_none=True),
61-
headers=headers
61+
headers=headers,
6262
)
6363
response.raise_for_status()
6464
logger.info(

0 commit comments

Comments
 (0)