1
1
import asyncio
2
2
import inspect
3
- from typing import Awaitable , Callable
3
+ from typing import Any , Awaitable , Callable
4
4
5
5
import yarl
6
6
from aiohttp import web
16
16
def startup_event_generator (
17
17
broker : AsyncBroker ,
18
18
app_path : str ,
19
+ app : Any ,
19
20
) -> Callable [[TaskiqState ], Awaitable [None ]]:
20
21
"""
21
22
Creates an event to run on broker's startup.
@@ -27,26 +28,27 @@ def startup_event_generator(
27
28
act the same as the real application.
28
29
29
30
:param broker: current broker.
30
- :param app_path: string with a path to an application or a factory.
31
+ :param app_path: path to the application.
32
+ :param app: current application or a fractory.
31
33
32
34
:returns: a function that is called on startup.
33
35
"""
34
36
35
37
async def startup (state : TaskiqState ) -> None :
36
38
loop = asyncio .get_event_loop ()
37
39
38
- app = import_object ( app_path )
40
+ local_app = app
39
41
40
- if not isinstance (app , web .Application ):
41
- app = app ()
42
+ if not isinstance (local_app , web .Application ):
43
+ local_app = local_app ()
42
44
43
- if inspect .iscoroutine (app ):
44
- app = await app
45
+ if inspect .iscoroutine (local_app ):
46
+ local_app = await local_app
45
47
46
- if not isinstance (app , web .Application ):
47
- raise ValueError (f"' { app_path } ' is not an AioHTTP application." )
48
+ if not isinstance (local_app , web .Application ):
49
+ raise ValueError (f"{ app_path } is not an AioHTTP application." )
48
50
49
- handler = RequestHandler (app ._make_handler (), loop = loop )
51
+ handler = RequestHandler (local_app ._make_handler (), loop = loop )
50
52
handler .transport = asyncio .Transport ()
51
53
request = web .Request (
52
54
RawRequestMessage (
@@ -76,19 +78,19 @@ async def startup(state: TaskiqState) -> None:
76
78
match_dict = {},
77
79
route = SystemRoute (web .HTTPBadRequest ()),
78
80
)
79
- request ._match_info ._apps = app ._subapps
80
- request ._match_info ._current_app = app
81
+ request ._match_info ._apps = local_app ._subapps
82
+ request ._match_info ._current_app = local_app
81
83
82
84
broker .add_dependency_context (
83
85
{
84
- web .Application : app ,
86
+ web .Application : local_app ,
85
87
web .Request : request ,
86
88
},
87
89
)
88
90
89
- state .aiohttp_app = app
90
- app .router ._resources = []
91
- await app .startup ()
91
+ state .aiohttp_app = local_app
92
+ local_app .router ._resources = []
93
+ await local_app .startup ()
92
94
93
95
return startup
94
96
@@ -121,9 +123,11 @@ def init(broker: AsyncBroker, app_path: str) -> None:
121
123
if not broker .is_worker_process :
122
124
return
123
125
126
+ app = import_object (app_path )
127
+
124
128
broker .add_event_handler (
125
129
TaskiqEvents .WORKER_STARTUP ,
126
- startup_event_generator (broker , app_path ),
130
+ startup_event_generator (broker , app_path , app ),
127
131
)
128
132
broker .add_event_handler (
129
133
TaskiqEvents .WORKER_SHUTDOWN ,
0 commit comments