21
21
22
22
23
23
class BlockingTrioPortal :
24
- """A portal that synchronous threads can reach through to run code in the
25
- Trio thread.
26
-
27
- Most Trio functions can only be called from the Trio thread, which is
28
- sometimes annoying. What if you really need to call a Trio function from a
29
- worker thread? That's where :class:`BlockingTrioPortal` comes in: it's the
30
- rare Trio object whose methods can – in fact, must! – be called from
31
- another thread, and it allows you to call all those other functions.
32
-
33
- There is one complication: it's possible for a single Python program to
34
- contain multiple calls to :func:`trio.run`, either in sequence – like in a
35
- test suite that calls :func:`trio.run` for each test – or simultaneously
36
- in different threads. So how do you control which :func:`trio.run` your
37
- portal opens into?
38
-
39
- The answer is that each :class:`BlockingTrioPortal` object is associated
40
- with one *specific* call to :func:`trio.run`.
41
-
42
- The simplest way to set this up is to instantiate the class with no
43
- arguments inside Trio; this automatically binds it to the context where
44
- you instantiate it::
45
-
46
- async def some_function():
47
- portal = trio.BlockingTrioPortal()
48
- await trio.run_sync_in_thread(sync_fn, portal)
49
-
50
- Alternatively, you can pass an explicit :class:`trio.hazmat.TrioToken` to
51
- specify the :func:`trio.run` that you want your portal to connect to.
52
-
53
- """
54
-
55
24
def __init__ (self , trio_token = None ):
56
25
if trio_token is None :
57
26
trio_token = trio .hazmat .current_trio_token ()
58
27
self ._trio_token = trio_token
59
28
60
- # This is the part that runs in the Trio thread
61
- def _run_cb (self , q , afn , args ):
62
- @disable_ki_protection
63
- async def unprotected_afn ():
64
- return await afn (* args )
65
-
66
- async def await_in_trio_thread_task ():
67
- q .put_nowait (await outcome .acapture (unprotected_afn ))
68
-
69
- trio .hazmat .spawn_system_task (await_in_trio_thread_task , name = afn )
70
-
71
- # This is the part that runs in the Trio thread
72
- def _run_sync_cb (self , q , fn , args ):
73
- @disable_ki_protection
74
- def unprotected_fn ():
75
- return fn (* args )
76
-
77
- res = outcome .capture (unprotected_fn )
78
- q .put_nowait (res )
79
-
80
- def _do_it (self , cb , fn , * args ):
81
- try :
82
- trio .hazmat .current_task ()
83
- except RuntimeError :
84
- pass
85
- else :
86
- raise RuntimeError (
87
- "this is a blocking function; call it from a thread"
88
- )
89
- q = stdlib_queue .Queue ()
90
- self ._trio_token .run_sync_soon (cb , q , fn , args )
91
- return q .get ().unwrap ()
92
-
93
29
def run (self , afn , * args ):
94
- """Run the given async function in the Trio thread, blocking until it
95
- is complete.
96
-
97
- Returns or raises whatever the given function returns or raises. It
98
- can also raise exceptions of its own:
99
-
100
- Raises:
101
- RunFinishedError: if the corresponding call to :func:`trio.run` has
102
- already completed.
103
- Cancelled: if the corresponding call to :func:`trio.run` completes
104
- while ``afn(*args)`` is running, then ``afn`` is likely to raise
105
- :class:`Cancelled`, and this will propagate out into
106
- RuntimeError: if you try calling this from inside the Trio thread,
107
- which would otherwise cause a deadlock.
108
-
109
- """
110
- return self ._do_it (self ._run_cb , afn , * args )
30
+ return run (afn , * args , trio_token = self ._trio_token )
111
31
112
32
def run_sync (self , fn , * args ):
113
- """Run the given synchronous function in the Trio thread, blocking
114
- until it is complete.
115
-
116
- Returns or raises whatever the given function returns or raises. It
117
- can also exceptions of its own:
118
-
119
- Raises:
120
- RunFinishedError: if the corresponding call to :func:`trio.run` has
121
- already completed.
122
- RuntimeError: if you try calling this from inside the Trio thread,
123
- which would otherwise cause a deadlock.
124
-
125
- """
126
- return self ._do_it (self ._run_sync_cb , fn , * args )
33
+ return run_sync (fn , * args , trio_token = self ._trio_token )
127
34
128
35
129
36
################################################################
@@ -469,13 +376,13 @@ def run(afn, *args, trio_token=None):
469
376
This happens when it was not spawned from trio.run_sync_in_thread.
470
377
471
378
**Locating a Trio Token**: There are two ways to specify which
472
- :func: `` trio.run()`` loop to reenter::
379
+ ` trio.run()` loop to reenter::
473
380
474
- - Spawn this thread from :func: `` run_sync_in_thread` `. This will
381
+ - Spawn this thread from ` run_sync_in_thread`. This will
475
382
"inject" the current Trio Token into thread local storage and allow
476
- this function to re-enter the same :func: `` trio.run()` ` loop.
383
+ this function to re-enter the same ` trio.run` loop.
477
384
- Pass a keyword argument, ``trio_token`` specifiying a specific
478
- :func: `` trio.run()` ` loop to re-enter. This is the "legacy" way of
385
+ ` trio.run` loop to re-enter. This is the "legacy" way of
479
386
re-entering a trio thread and is similar to the old
480
387
`BlockingTrioPortal`.
481
388
"""
@@ -504,9 +411,9 @@ def run_sync(fn, *args, trio_token=None):
504
411
can also raise exceptions of its own:
505
412
506
413
Raises:
507
- RunFinishedError: if the corresponding call to :func: `trio.run` has
414
+ RunFinishedError: if the corresponding call to `trio.run` has
508
415
already completed.
509
- Cancelled: if the corresponding call to :func: `trio.run` completes
416
+ Cancelled: if the corresponding call to `trio.run` completes
510
417
while ``afn(*args)`` is running, then ``afn`` is likely to raise
511
418
:class:`Cancelled`, and this will propagate out into
512
419
RuntimeError: if you try calling this from inside the Trio thread,
@@ -515,13 +422,13 @@ def run_sync(fn, *args, trio_token=None):
515
422
This happens when it was not spawned from trio.run_sync_in_thread.
516
423
517
424
**Locating a Trio Token**: There are two ways to specify which
518
- :func: `` trio.run()` ` loop to reenter::
425
+ ` trio.run` loop to reenter::
519
426
520
- - Spawn this thread from :func: `` run_sync_in_thread` `. This will
427
+ - Spawn this thread from ` run_sync_in_thread`. This will
521
428
"inject" the current Trio Token into thread local storage and allow
522
- this function to re-enter the same :func: `` trio.run()` ` loop.
429
+ this function to re-enter the same ` trio.run` loop.
523
430
- Pass a keyword argument, ``trio_token`` specifiying a specific
524
- :func: `` trio.run()` ` loop to re-enter. This is the "legacy" way of
431
+ ` trio.run()` loop to re-enter. This is the "legacy" way of
525
432
re-entering a trio thread and is similar to the old
526
433
`BlockingTrioPortal`.
527
434
"""
0 commit comments