Skip to content

Commit e1daef5

Browse files
committed
Merge branch 'release/0.1.0'
2 parents 9c97625 + 2c89373 commit e1daef5

File tree

6 files changed

+135
-269
lines changed

6 files changed

+135
-269
lines changed

.github/workflows/test.yml

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,35 +26,3 @@ jobs:
2626
run: poetry install
2727
- name: Run lint check
2828
run: poetry run pre-commit run -a ${{ matrix.cmd }}
29-
pytest:
30-
permissions:
31-
checks: write
32-
pull-requests: write
33-
contents: write
34-
strategy:
35-
matrix:
36-
py_version: ["3.7", "3.8", "3.9", "3.10"]
37-
os: [ubuntu-latest, windows-latest]
38-
runs-on: "${{ matrix.os }}"
39-
steps:
40-
- uses: actions/checkout@v2
41-
- name: Install poetry
42-
run: pipx install poetry
43-
- name: Set up Python
44-
uses: actions/setup-python@v4
45-
with:
46-
python-version: "${{ matrix.py_version }}"
47-
cache: "poetry"
48-
- name: Install deps
49-
run: poetry install
50-
- name: Run pytest check
51-
run: poetry run pytest -vv -n auto --cov="taskiq_fastapi" .
52-
- name: Generate report
53-
run: poetry run coverage xml
54-
- name: Upload coverage reports to Codecov with GitHub Action
55-
uses: codecov/codecov-action@v3
56-
if: matrix.os == 'ubuntu-latest' && matrix.py_version == '3.9'
57-
with:
58-
token: ${{ secrets.CODECOV_TOKEN }}
59-
fail_ci_if_error: false
60-
verbose: true

README.md

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,113 @@
1-
# taskiq_fastapi
1+
# Taskiq + FastAPI
2+
3+
This repository has a code to integrate FastAPI with taskiq easily.
4+
5+
Taskiq and FastAPI both have dependencies and this library makes it possible to depend on
6+
`fastapi.Request` or `starlette.requests.HTTPConnection` in taskiq tasks.
7+
8+
With this library you can easily re-use your fastapi dependencies in taskiq functions.
9+
10+
## How does it work?
11+
12+
It adds startup functions to broker so it imports your fastapi application
13+
and creates a single worker-wide Request and HTTPConnection objects that you depend on.
14+
15+
THIS REQUEST IS NOT RELATED TO THE ACTUAL REQUESTS IN FASTAPI!
16+
This request won't have actual data about the request you were handling while sending task.
17+
18+
## Usage
19+
20+
Here we have an example of function that is being used by both taskiq's task and
21+
fastapi's handler function.
22+
23+
I have a script called `test_script.py` so my app can be found at `test_script:app`.
24+
We use strings to resolve application to bypass circular imports.
25+
26+
```python
27+
from fastapi import FastAPI, Request
28+
from pydantic import BaseModel
29+
from redis.asyncio import ConnectionPool, Redis
30+
from fastapi import Depends as FastAPIDepends
31+
from taskiq import TaskiqDepends
32+
import taskiq_fastapi
33+
from taskiq import ZeroMQBroker
34+
35+
broker = ZeroMQBroker()
36+
37+
app = FastAPI()
38+
39+
40+
@app.on_event("startup")
41+
async def app_startup():
42+
#####################
43+
# IMPORTANT NOTE #
44+
#####################
45+
# If you won't check that this is not
46+
# a worker process, you'll
47+
# create an infinite recursion. Because in worker processes
48+
# fastapi startup will be called.
49+
if not broker.is_worker_process:
50+
print("Starting broker")
51+
await broker.startup()
52+
print("Creating redis pool")
53+
app.state.redis_pool = ConnectionPool.from_url("redis://localhost")
54+
55+
56+
@app.on_event("shutdown")
57+
async def app_shutdown():
58+
#####################
59+
# IMPORTANT NOTE #
60+
#####################
61+
# If you won't check that this is not
62+
# a worker process, you'll
63+
# create an infinite recursion. Because in worker processes
64+
# fastapi startup will be called.
65+
if not broker.is_worker_process:
66+
print("Shutting down broker")
67+
await broker.shutdown()
68+
print("Stopping redis pool")
69+
await app.state.redis_pool.disconnect()
70+
71+
72+
# Here we call our magic function.
73+
taskiq_fastapi.init(broker, "test_script:app")
74+
75+
76+
def get_redis_pool(request: Request = TaskiqDepends()) -> ConnectionPool:
77+
return request.app.state.redis_pool
78+
79+
80+
@broker.task
81+
async def my_redis_task(
82+
key: str,
83+
val: str,
84+
pool: ConnectionPool = TaskiqDepends(get_redis_pool),
85+
):
86+
async with Redis(connection_pool=pool) as redis:
87+
await redis.set(key, val)
88+
print("Value set.")
89+
90+
91+
class MyVal(BaseModel):
92+
key: str
93+
val: str
94+
95+
96+
@app.post("/val")
97+
async def setval_endpoint(val: MyVal) -> None:
98+
await my_redis_task.kiq(
99+
key=val.key,
100+
val=val.val,
101+
)
102+
print("Task sent")
103+
104+
105+
@app.get("/val")
106+
async def getval_endpoint(
107+
key: str,
108+
pool: ConnectionPool = FastAPIDepends(get_redis_pool),
109+
) -> str:
110+
async with Redis(connection_pool=pool, decode_responses=True) as redis:
111+
return await redis.get(key)
112+
113+
```

0 commit comments

Comments
 (0)