Skip to content

Commit bcd9d48

Browse files
authored
Add bg-service ex. to readme (#8)
1 parent 0175610 commit bcd9d48

File tree

3 files changed

+121
-15
lines changed

3 files changed

+121
-15
lines changed

README.md

Lines changed: 118 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,93 @@ Initialize PostgreSQL triggers to emit NOTIFY events on data changes. PGCacheWat
2525
pgcachewatch install <tables-to-cache>
2626
```
2727

28-
### FastAPI Example
29-
Example showing how to use PGCacheWatch for cache invalidation in a FastAPI app
28+
## Automating User Data Enrichment with PGCacheWatch and Asyncio
29+
30+
In the era of data-driven applications, keeping user information comprehensive and up-to-date is paramount. However, the challenge often lies in efficiently updating user profiles with additional information fetched from external sources, especially in response to new user registrations. This process can significantly benefit from automation, ensuring that every new user's data is enriched without manual intervention.
31+
32+
The following Python example leverages `PGCacheWatch` in conjunction with `asyncio` and `asyncpg` to automate the enrichment of new user data in a PostgreSQL database. By listening for new user events, the application fetches additional information asynchronously from simulated external REST APIs and updates the user's record. This seamless integration not only enhances data quality but also optimizes backend workflows by reducing the need for constant database polling.
33+
34+
### What This Example Covers
35+
36+
- **Listening for New User Registrations**: Utilizing `PGCacheWatch` to listen for new user events in a PostgreSQL database, triggering data enrichment processes.
37+
- **Fetching Additional Information**: Simulating asynchronous calls to external REST APIs to fetch additional information for newly registered users.
38+
- **Updating User Profiles**: Demonstrating how to update user records in the database with the fetched information, completing the data enrichment cycle.
39+
40+
This guide is intended for developers seeking to automate data enrichment processes in their applications, particularly those using PostgreSQL for data management. The example provides a practical approach to integrating real-time event handling with asynchronous programming for efficient data updates.
41+
42+
```python
43+
import asyncio
44+
import asyncpg
45+
from pgcachewatch import listeners, models
46+
47+
async def fetch_users_without_additional_user_info() -> list:
48+
"""
49+
Fetches a list of users who do not yet have additional user information associated.
50+
"""
51+
...
52+
53+
async def update_users_without_additional_user_info(
54+
user_id: int,
55+
additional_user_info: dict,
56+
) -> None:
57+
"""
58+
Updates users with the additional information fetched from an external source.
59+
"""
60+
...
61+
62+
async def fetch_additional_user_info(user_id: int) -> dict:
63+
"""
64+
Simulates fetching additional user information via REST APIs.
65+
Note: This is a mock function. In a real application, this would make an asynchronous
66+
API call to fetch information from an external service.
67+
"""
68+
await asyncio.sleep(1) # Simulate API call delay
69+
return {"info": "Additional info for user"} # Example return value
70+
71+
async def process_new_user_event() -> None:
72+
"""
73+
Processes new user events by fetching additional information for new users
74+
and updating their records.
75+
"""
76+
new_users = await fetch_users_without_additional_user_info()
77+
for user_id in new_users:
78+
user_info = await fetch_additional_user_info(user_id)
79+
await update_users_without_additional_user_info(user_id, user_info)
80+
81+
async def listen_for_new_users() -> None:
82+
"""
83+
Listens for new user events and processes each event as it arrives.
84+
85+
This function establishes a connection to the database and listens on a specified
86+
channel for new user events. When a new user is added (detected via an "insert" operation),
87+
it triggers the processing of new user events to fetch and update additional information.
88+
"""
89+
conn = await asyncpg.connect() # Connect to your PostgreSQL database
90+
listener = listeners.PGEventQueue()
91+
await listener.connect(conn)
92+
93+
try:
94+
print("Listening for new user events...")
95+
async for event in listener.aiter():
96+
if event.operation == "insert":
97+
await process_new_user_event()
98+
finally:
99+
await conn.close()
100+
101+
if __name__ == "__main__":
102+
asyncio.run(listen_for_new_users())
103+
```
104+
105+
## Integrating PGCacheWatch with FastAPI for Dynamic Cache Invalidation
106+
In modern web applications, maintaining data consistency while ensuring high performance can be a significant challenge. Caching is a common strategy to enhance performance, but it introduces complexity when it comes to invalidating cached data upon updates. `PGCacheWatch` offers a robust solution by leveraging PostgreSQL's NOTIFY/LISTEN features to invalidate cache entries in real-time, ensuring your application's data remains fresh and consistent.
107+
108+
This example demonstrates how to integrate `PGCacheWatch` with FastAPI, a popular asynchronous web framework, to create an efficient and responsive web application. By combining FastAPI's scalability with `PGCacheWatch`'s real-time cache invalidation capabilities, developers can build applications that automatically update cached data upon changes in the database, minimizing latency and improving user experience.
109+
110+
### What You'll Learn
111+
112+
- **Setting Up `PGCacheWatch` with FastAPI**: How to configure `PGCacheWatch` to work within the FastAPI application lifecycle, including database connection setup and teardown.
113+
- **Implementing Cache Invalidation Strategies**: Utilizing `PGCacheWatch`'s decorators and strategies to invalidate cached data based on database events, specifically focusing on updates.
114+
- **Creating Responsive Endpoints**: Building FastAPI routes that serve dynamically updated data, ensuring that the information presented to the user is always up-to-date.
30115

31116
```python
32117
import contextlib
@@ -36,33 +121,53 @@ import asyncpg
36121
from fastapi import FastAPI
37122
from pgcachewatch import decorators, listeners, models, strategies
38123

124+
# Initialize a PGEventQueue listener to listen for database events.
39125
listener = listeners.PGEventQueue()
40126

41-
42127
@contextlib.asynccontextmanager
43128
async def app_setup_teardown(_: FastAPI) -> typing.AsyncGenerator[None, None]:
44-
conn = await asyncpg.connect()
45-
await listener.connect(conn, models.PGChannel("ch_pgcachewatch_table_change"))
46-
yield
47-
await conn.close()
129+
"""
130+
Asynchronous context manager for FastAPI app setup and teardown.
48131
132+
This context manager is used to establish and close the database connection
133+
at the start and end of the FastAPI application lifecycle, respectively.
134+
"""
135+
# Establish a database connection using asyncpg.
136+
conn = await asyncpg.connect()
137+
# Connect the listener to the database using the specified channel.
138+
await listener.connect(conn)
139+
yield # Yield control back to the event loop.
140+
await conn.close() # Ensure the database connection is closed on app teardown.
49141

142+
# Create an instance of FastAPI, specifying the app setup and teardown actions.
50143
APP = FastAPI(lifespan=app_setup_teardown)
51144

52-
53-
# Only allow for cache refresh after an update
145+
# Decorate the cached_query function with cache invalidation logic.
54146
@decorators.cache(
55-
strategy=strategies.Gready(
147+
strategy=strategies.Greedy( # Note: Assuming 'Gready' is a typo, corrected to 'Greedy'.
56148
listener=listener,
149+
# Invalidate the cache only for 'update' operations on the database.
57150
predicate=lambda x: x.operation == "update",
58151
)
59152
)
60153
async def cached_query() -> dict[str, str]:
61-
# Simulate a database query
154+
"""
155+
Simulates a database query that benefits from cache invalidation.
156+
157+
This function is decorated to use PGCacheWatch's cache invalidation, ensuring
158+
that the data returned is up-to-date following any relevant 'update' operations
159+
on the database.
160+
"""
161+
# Return a mock data response.
62162
return {"data": "query result"}
63163

64-
164+
# Define a FastAPI route to fetch data, utilizing the cached_query function.
65165
@APP.get("/data")
66166
async def get_data() -> dict:
167+
"""
168+
This endpoint uses the cached_query function to return data, demonstrating
169+
how cache invalidation can be integrated into a web application route.
170+
"""
171+
# Fetch and return the data using the cached query function.
67172
return await cached_query()
68-
```
173+
```

src/pgcachewatch/listeners.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,11 @@ def parse_and_put(
9393
parsed = models.Event.model_validate(
9494
json.loads(payload) | {"channel": channel}
9595
)
96-
if parsed.latency > self._max_latency:
97-
logging.warning("Latency for %s above %s.", parsed, self._max_latency)
9896
except Exception:
9997
logging.exception("Unable to parse `%s`.", payload)
10098
else:
99+
if parsed.latency > self._max_latency:
100+
logging.warning("Latency for %s above %s.", parsed, self._max_latency)
101101
logging.info("Received event: %s on %s", parsed, channel)
102102
try:
103103
self.put_nowait(parsed)

src/pgcachewatch/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"update",
99
"delete",
1010
]
11+
1112
PGChannel = typing.NewType(
1213
"PGChannel",
1314
str,

0 commit comments

Comments
 (0)