Skip to content

Commit da57488

Browse files
authored
Add EventQueueProtocol (#9)
1 parent 49e7d86 commit da57488

File tree

4 files changed

+62
-23
lines changed

4 files changed

+62
-23
lines changed

src/pgcachewatch/decorators.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def outer(fn: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]:
1919

2020
async def inner(*args: P.args, **kwargs: P.kwargs) -> T:
2121
# If db-conn is down, disable cache.
22-
if not strategy.pg_connection_healthy():
22+
if not strategy.connection_healthy():
2323
logging.critical("Database connection is closed, caching disabled.")
2424
return await fn(*args, **kwargs)
2525

src/pgcachewatch/listeners.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import datetime
33
import json
44
import logging
5+
from typing import Protocol
56

67
import asyncpg
78

@@ -15,6 +16,44 @@ def _critical_termination_listener(*_: object, **__: object) -> None:
1516
logging.critical("Connection is closed / terminated.")
1617

1718

19+
class EventQueueProtocol(Protocol):
20+
"""
21+
Protocol for an event queue interface.
22+
23+
Specifies the required methods for an event queue to check the connection health
24+
and to retrieve events without waiting. Implementing classes must provide concrete
25+
implementations of these methods to ensure compatibility with the event handling
26+
system.
27+
"""
28+
29+
def connection_healthy(self) -> bool:
30+
"""
31+
Checks if the connection is healthy.
32+
33+
This method should return True if the connection to the underlying service
34+
(e.g., database, message broker) is active and healthy, False otherwise.
35+
36+
Returns:
37+
bool: True if the connection is healthy, False otherwise.
38+
"""
39+
raise NotImplementedError
40+
41+
def get_nowait(self) -> models.Event:
42+
"""
43+
Retrieves an event from the queue without waiting.
44+
45+
Attempts to immediately retrieve an event from the queue. If no event is
46+
available, this method should raise an appropriate exception (e.g., QueueEmpty).
47+
48+
Returns:
49+
models.Event: The event retrieved from the queue.
50+
51+
Raises:
52+
QueueEmpty: If no event is available in the queue to retrieve.
53+
"""
54+
raise NotImplementedError
55+
56+
1857
class PGEventQueue(asyncio.Queue[models.Event]):
1958
"""
2059
A PostgreSQL event queue that listens to a specified
@@ -34,7 +73,7 @@ def __init__(
3473
async def connect(
3574
self,
3675
connection: asyncpg.Connection,
37-
channel: models.PGChannel,
76+
channel: models.PGChannel = models.PGChannel("ch_pgcachewatch_table_change"),
3877
) -> None:
3978
"""
4079
Asynchronously connects the PGEventQueue to a specified
@@ -104,5 +143,5 @@ def parse_and_put(
104143
except Exception:
105144
logging.exception("Unable to queue `%s`.", parsed)
106145

107-
def pg_connection_healthy(self) -> bool:
146+
def connection_healthy(self) -> bool:
108147
return bool(self._pg_connection and not self._pg_connection.is_closed())

src/pgcachewatch/strategies.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class Strategy(typing.Protocol):
1313
def clear(self) -> bool:
1414
raise NotImplementedError
1515

16-
def pg_connection_healthy(self) -> bool:
16+
def connection_healthy(self) -> bool:
1717
raise NotImplementedError
1818

1919

@@ -24,22 +24,22 @@ class Gready(Strategy):
2424

2525
def __init__(
2626
self,
27-
listener: listeners.PGEventQueue,
28-
deadline: models.DeadlineSetting = models.DeadlineSetting(),
27+
listener: listeners.EventQueueProtocol,
28+
settings: models.DeadlineSetting = models.DeadlineSetting(),
2929
predicate: typing.Callable[[models.Event], bool] = bool,
3030
) -> None:
3131
super().__init__()
3232
self._listener = listener
3333
self._predicate = predicate
34-
self._deadline = deadline
34+
self._settings = settings
3535

36-
def pg_connection_healthy(self) -> bool:
37-
return self._listener.pg_connection_healthy()
36+
def connection_healthy(self) -> bool:
37+
return self._listener.connection_healthy()
3838

3939
def clear(self) -> bool:
4040
for current in utils.pick_until_deadline(
4141
self._listener,
42-
settings=self._deadline,
42+
settings=self._settings,
4343
):
4444
if self._predicate(current):
4545
return True
@@ -54,23 +54,23 @@ class Windowed(Strategy):
5454

5555
def __init__(
5656
self,
57-
listener: listeners.PGEventQueue,
57+
listener: listeners.EventQueueProtocol,
5858
window: list[models.OPERATIONS],
59-
deadline: models.DeadlineSetting = models.DeadlineSetting(),
59+
settings: models.DeadlineSetting = models.DeadlineSetting(),
6060
) -> None:
6161
super().__init__()
6262
self._listener = listener
6363
self._window = window
64-
self._deadline = deadline
64+
self._settings = settings
6565
self._events = collections.deque[models.OPERATIONS](maxlen=len(self._window))
6666

67-
def pg_connection_healthy(self) -> bool:
68-
return self._listener.pg_connection_healthy()
67+
def connection_healthy(self) -> bool:
68+
return self._listener.connection_healthy()
6969

7070
def clear(self) -> bool:
7171
for current in utils.pick_until_deadline(
7272
self._listener,
73-
settings=self._deadline,
73+
settings=self._settings,
7474
):
7575
self._events.append(current.operation)
7676
if len(self._window) == len(self._events) and all(
@@ -87,23 +87,23 @@ class Timed(Strategy):
8787

8888
def __init__(
8989
self,
90-
listener: listeners.PGEventQueue,
90+
listener: listeners.EventQueueProtocol,
9191
timedelta: datetime.timedelta,
92-
deadline: models.DeadlineSetting = models.DeadlineSetting(),
92+
settings: models.DeadlineSetting = models.DeadlineSetting(),
9393
) -> None:
9494
super().__init__()
9595
self._listener = listener
9696
self._timedelta = timedelta
97-
self._deadline = deadline
97+
self._settings = settings
9898
self._previous = datetime.datetime.now(tz=datetime.timezone.utc)
9999

100-
def pg_connection_healthy(self) -> bool:
101-
return self._listener.pg_connection_healthy()
100+
def connection_healthy(self) -> bool:
101+
return self._listener.connection_healthy()
102102

103103
def clear(self) -> bool:
104104
for current in utils.pick_until_deadline(
105105
queue=self._listener,
106-
settings=self._deadline,
106+
settings=self._settings,
107107
):
108108
if current.sent_at - self._previous > self._timedelta:
109109
self._previous = current.sent_at

src/pgcachewatch/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ async def emit_event(
3434

3535

3636
def pick_until_deadline(
37-
queue: listeners.PGEventQueue,
37+
queue: listeners.EventQueueProtocol,
3838
settings: models.DeadlineSetting,
3939
) -> typing.Iterator[models.Event]:
4040
"""

0 commit comments

Comments
 (0)