@@ -11,6 +11,7 @@ use std::future::Future;
11
11
use std:: future:: pending;
12
12
use std:: pin:: Pin ;
13
13
use std:: sync:: Arc ;
14
+ use std:: sync:: OnceLock ;
14
15
15
16
use async_trait:: async_trait;
16
17
use hyperactor:: Actor ;
@@ -41,6 +42,7 @@ use serde_bytes::ByteBuf;
41
42
use tokio:: sync:: Mutex ;
42
43
use tokio:: sync:: oneshot;
43
44
45
+ use crate :: config:: SHARED_ASYNCIO_RUNTIME ;
44
46
use crate :: mailbox:: EitherPortRef ;
45
47
use crate :: mailbox:: PyMailbox ;
46
48
use crate :: proc:: InstanceWrapper ;
@@ -275,8 +277,20 @@ pub(super) struct PythonActor {
275
277
pub ( super ) actor : PyObject ,
276
278
277
279
/// Stores a reference to the Python event loop to run Python coroutines on.
278
- /// We give each PythonActor its own even loop in its own thread.
279
- task_locals : pyo3_async_runtimes:: TaskLocals ,
280
+ /// This is None when using single runtime mode, Some when using per-actor mode.
281
+ task_locals : Option < pyo3_async_runtimes:: TaskLocals > ,
282
+ }
283
+
284
+ impl PythonActor {
285
+ /// Get the TaskLocals to use for this actor.
286
+ /// Returns either the shared TaskLocals or this actor's own TaskLocals based on configuration.
287
+ fn get_task_locals ( & self , py : Python ) -> & pyo3_async_runtimes:: TaskLocals {
288
+ self . task_locals . as_ref ( ) . unwrap_or_else ( || {
289
+ // Use shared TaskLocals
290
+ static SHARED_TASK_LOCALS : OnceLock < pyo3_async_runtimes:: TaskLocals > = OnceLock :: new ( ) ;
291
+ Python :: allow_threads ( py, || SHARED_TASK_LOCALS . get_or_init ( create_task_locals) )
292
+ } )
293
+ }
280
294
}
281
295
282
296
#[ async_trait]
@@ -289,32 +303,36 @@ impl Actor for PythonActor {
289
303
let class_type: & Bound < ' _ , PyType > = unpickled. downcast ( ) ?;
290
304
let actor: PyObject = class_type. call0 ( ) ?. into_py_any ( py) ?;
291
305
292
- // Release the GIL so that the thread spawned below can acquire it.
293
- let task_locals = Python :: allow_threads ( py, || {
294
- let ( tx, rx) = std:: sync:: mpsc:: channel ( ) ;
295
- let _ = std:: thread:: spawn ( move || {
296
- Python :: with_gil ( |py| {
297
- let asyncio = Python :: import ( py, "asyncio" ) . unwrap ( ) ;
298
- let event_loop = asyncio. call_method0 ( "new_event_loop" ) . unwrap ( ) ;
299
- asyncio
300
- . call_method1 ( "set_event_loop" , ( event_loop. clone ( ) , ) )
301
- . unwrap ( ) ;
302
-
303
- let task_locals = pyo3_async_runtimes:: TaskLocals :: new ( event_loop. clone ( ) )
304
- . copy_context ( py)
305
- . unwrap ( ) ;
306
- tx. send ( task_locals) . unwrap ( ) ;
307
- event_loop. call_method0 ( "run_forever" ) . unwrap ( ) ;
308
- } ) ;
309
- } ) ;
310
- rx. recv ( ) . unwrap ( )
311
- } ) ;
306
+ // Only create per-actor TaskLocals if not using shared runtime
307
+ let task_locals = ( !hyperactor:: config:: global:: get ( SHARED_ASYNCIO_RUNTIME ) )
308
+ . then ( || Python :: allow_threads ( py, create_task_locals) ) ;
312
309
313
310
Ok ( Self { actor, task_locals } )
314
311
} ) ?)
315
312
}
316
313
}
317
314
315
+ /// Create a new TaskLocals with its own asyncio event loop in a dedicated thread.
316
+ fn create_task_locals ( ) -> pyo3_async_runtimes:: TaskLocals {
317
+ let ( tx, rx) = std:: sync:: mpsc:: channel ( ) ;
318
+ let _ = std:: thread:: spawn ( move || {
319
+ Python :: with_gil ( |py| {
320
+ let asyncio = Python :: import ( py, "asyncio" ) . unwrap ( ) ;
321
+ let event_loop = asyncio. call_method0 ( "new_event_loop" ) . unwrap ( ) ;
322
+ asyncio
323
+ . call_method1 ( "set_event_loop" , ( event_loop. clone ( ) , ) )
324
+ . unwrap ( ) ;
325
+
326
+ let task_locals = pyo3_async_runtimes:: TaskLocals :: new ( event_loop. clone ( ) )
327
+ . copy_context ( py)
328
+ . unwrap ( ) ;
329
+ tx. send ( task_locals) . unwrap ( ) ;
330
+ event_loop. call_method0 ( "run_forever" ) . unwrap ( ) ;
331
+ } ) ;
332
+ } ) ;
333
+ rx. recv ( ) . unwrap ( )
334
+ }
335
+
318
336
// [Panics in async endpoints]
319
337
// This class exists to solve a deadlock when an async endpoint calls into some
320
338
// Rust code that panics.
@@ -403,7 +421,7 @@ impl Handler<PythonMessage> for PythonActor {
403
421
} ;
404
422
405
423
pyo3_async_runtimes:: into_future_with_locals (
406
- & self . task_locals ,
424
+ self . get_task_locals ( py ) ,
407
425
awaitable. into_bound ( py) ,
408
426
)
409
427
. map_err ( |err| err. into ( ) )
0 commit comments