|
1 |
| -# taskiq_litestar |
| 1 | +# Taskiq Litestar |
| 2 | + |
| 3 | +This project adds integration with [Litestar](https://litestar.dev/) to [TaskIQ](https://taskiq-python.github.io/). |
| 4 | + |
| 5 | +Mainly this project focuses on running starup and shutdown events declared in your litestar app |
| 6 | +on worker nodes. This will allow you to access application's state and data from within your tasks. |
| 7 | + |
| 8 | +Also we add a few dependencies that you can depend on in your tasks. |
| 9 | +* `State` from `litestar.datastructures`; |
| 10 | +* `Litestar` from `litestar`. |
| 11 | + |
| 12 | +# Installation |
| 13 | + |
| 14 | +```bash |
| 15 | +pip install taskiq-litestar |
| 16 | +``` |
| 17 | + |
| 18 | +# Usage |
| 19 | + |
| 20 | +Here we have a script called `test_script.py` so the listestar app can be found at `test_script:app`. We use strings to resolve application to bypass circular imports. |
| 21 | + |
| 22 | +In the declared task I depend on a state. |
| 23 | + |
| 24 | +```python |
| 25 | +from contextlib import asynccontextmanager |
| 26 | + |
| 27 | +from litestar import Litestar, get |
| 28 | +from litestar.datastructures import State |
| 29 | +from taskiq import TaskiqDepends |
| 30 | +from taskiq_redis import ListQueueBroker |
| 31 | + |
| 32 | +import taskiq_litestar |
| 33 | + |
| 34 | +broker = ListQueueBroker("redis://localhost:6379/0") |
| 35 | + |
| 36 | +taskiq_litestar.init( |
| 37 | + broker, |
| 38 | + "test_script:app", |
| 39 | +) |
| 40 | + |
| 41 | + |
| 42 | +@asynccontextmanager |
| 43 | +async def app_lifespan(app: Litestar) -> None: |
| 44 | + """Lifespan generator.""" |
| 45 | + if not broker.is_worker_process: |
| 46 | + await broker.startup() |
| 47 | + |
| 48 | + app.state.value = "abc123" |
| 49 | + |
| 50 | + yield |
| 51 | + |
| 52 | + if not broker.is_worker_process: |
| 53 | + await broker.shutdown() |
| 54 | + |
| 55 | + |
| 56 | +@broker.task() |
| 57 | +async def my_task(state: State = TaskiqDepends()) -> None: |
| 58 | + """My task.""" |
| 59 | + print("a", state.dict()) # noqa: T201 |
| 60 | + |
| 61 | + |
| 62 | +@get("/") |
| 63 | +async def index() -> str: |
| 64 | + """Index get handler.""" |
| 65 | + await my_task.kiq() |
| 66 | + return "Task sent" |
| 67 | + |
| 68 | + |
| 69 | +app = Litestar([index], lifespan=[app_lifespan]) |
| 70 | +``` |
| 71 | + |
| 72 | +# Manually update dependency context |
| 73 | + |
| 74 | +When using InMemoryBroker you can manually update the dependency context. |
| 75 | +This might come handy when setting up tests. |
| 76 | + |
| 77 | +```python |
| 78 | +import taskiq_litestar |
| 79 | +from taskiq import InMemoryBroker |
| 80 | + |
| 81 | +broker = InMemoryBroker() |
| 82 | + |
| 83 | +app = FastAPI() |
| 84 | + |
| 85 | +taskiq_fastapi.init(broker, "test_script:app") |
| 86 | +taskiq_fastapi.populate_dependency_context(broker, app) |
| 87 | +``` |
0 commit comments