Skip to content

Commit 7f36105

Browse files
committed
[trio.from_thread]
This addressses issue #810 by implementing `trio.from_thread.run` and `trio.from_thread.run_sync` and deprecating `BlockingTrioPortal`. Support re-entering the same `Trio.run()` loop by stashing the current trio token in Thread Local Storage right before `run_sync_in_thread` spawns a worker thread. When `trio.from_thread.run_sync` or `trio.from_thread.run` are called in this thread, they can access the token and use it to re-enter the `Trio.run()` loop. This commit deprecates `BlockingTrioPortal`. For the majority of use cases, using the thread local Trio Token should be sufficient. If for any reason, another special Trio Token needs to be used, it can be passed as a kwarg to `from_thread.run` and `from_thread.run_sync`. Here is a snippet from how the new API works: ```python3 import trio def thread_fn(): start = trio.from_thread.run_sync(trio.current_time) print("In Trio-land, the time is now:", start) trio.from_thread.run(trio.sleep, 1) end = trio.from_thread.run_sync(trio.current_time) print("And now it's:", end) async def main(): await trio.run_sync_in_thread(thread_fn) trio.run(main) ``` Here is how the same code works in "old" Trio: ```python3 import trio def thread_fn(token): portal = trio.BlockingTrioPortal(token) start = portal.run_sync(trio.current_time) print("In Trio-land, the time is now:", start) portal.run(trio.sleep, 1) end = portal.run_sync(trio.current_time) print("And now it's:", end) async def main(): token = trio.hazmat.current_trio_token() await trio.run_sync_in_thread(thread_fn, token) trio.run(main) ```
1 parent 7d0bb60 commit 7f36105

File tree

4 files changed

+224
-3
lines changed

4 files changed

+224
-3
lines changed

trio/__init__.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@
3232
Event, CapacityLimiter, Semaphore, Lock, StrictFIFOLock, Condition
3333
)
3434

35-
from ._threads import (
36-
run_sync_in_thread, current_default_thread_limiter, BlockingTrioPortal
37-
)
35+
from ._threads import (run_sync_in_thread, current_default_thread_limiter)
36+
from ._threads import BlockingTrioPortal as _BlockingTrioPortal
3837

3938
from ._highlevel_generic import aclose_forcefully, StapledStream
4039

@@ -70,6 +69,7 @@
7069
from . import hazmat
7170
from . import socket
7271
from . import abc
72+
from . import from_thread
7373
# Not imported by default: testing
7474
if False:
7575
from . import testing
@@ -114,6 +114,13 @@
114114
issue=810,
115115
instead=current_default_thread_limiter,
116116
),
117+
"BlockingTrioPortal":
118+
_deprecate.DeprecatedAttribute(
119+
_BlockingTrioPortal,
120+
"0.12.0",
121+
issue=810,
122+
instead=("trio.from_thread.run(), trio.from_thread.run_sync()")
123+
),
117124
}
118125

119126
# Having the public path in .__module__ attributes is important for:

trio/_threads.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,12 @@ async def run_sync_in_thread(sync_fn, *args, cancellable=False, limiter=None):
269269
tasks to continue working while ``sync_fn`` runs. This is accomplished by
270270
pushing the call to ``sync_fn(*args)`` off into a worker thread.
271271
272+
``run_sync_in_thread`` also injects the current ``TrioToken`` into the
273+
spawned thread's local storage so that these threads can re-enter the Trio
274+
loop by calling either :func: ``trio.from_thread.run`` or
275+
:func: ``trio.from_thread.run_sync`` for async or synchronous functions,
276+
respectively.
277+
272278
Args:
273279
sync_fn: An arbitrary synchronous callable.
274280
*args: Positional arguments to pass to sync_fn. If you need keyword
@@ -385,6 +391,7 @@ def worker_thread_fn():
385391
thread = threading.Thread(
386392
target=worker_thread_fn, name=name, daemon=True
387393
)
394+
setattr(thread, 'current_trio_token', trio.hazmat.current_trio_token())
388395
thread.start()
389396
except:
390397
limiter.release_on_behalf_of(placeholder)
@@ -398,3 +405,118 @@ def abort(_):
398405
return trio.hazmat.Abort.FAILED
399406

400407
return await trio.hazmat.wait_task_rescheduled(abort)
408+
409+
410+
def _run_fn_as_system_task(cb, fn, *args, trio_token=None):
411+
"""Helper function for from_thread.run and from_thread.run_sync.
412+
413+
Since this internally uses TrioToken.run_sync_soon, all warnings about
414+
raised exceptions canceling all tasks should be noted.
415+
416+
"""
417+
if not trio_token:
418+
current_thread = threading.current_thread()
419+
trio_token = getattr(current_thread, 'current_trio_token')
420+
421+
try:
422+
trio.hazmat.current_task()
423+
except RuntimeError:
424+
pass
425+
else:
426+
raise RuntimeError(
427+
"this is a blocking function; call it from a thread"
428+
)
429+
430+
q = stdlib_queue.Queue()
431+
trio_token.run_sync_soon(cb, q, fn, args)
432+
return q.get().unwrap()
433+
434+
435+
def run(afn, *args, trio_token=None):
436+
"""Run the given async function in the parent Trio thread, blocking until it
437+
is complete.
438+
439+
Returns:
440+
Whatever ``afn(*args)`` returns.
441+
442+
Returns or raises whatever the given function returns or raises. It
443+
can also raise exceptions of its own:
444+
445+
Raises:
446+
RunFinishedError: if the corresponding call to :func:`trio.run` has
447+
already completed.
448+
Cancelled: if the corresponding call to :func:`trio.run` completes
449+
while ``afn(*args)`` is running, then ``afn`` is likely to raise
450+
:class:`Cancelled`, and this will propagate out into
451+
RuntimeError: if you try calling this from inside the Trio thread,
452+
which would otherwise cause a deadlock.
453+
AttributeError: if run()'s thread local storage does not have a token.
454+
This happens when it was not spawned from trio.run_sync_in_thread.
455+
456+
**Locating a Trio Token**: There are two ways to specify which
457+
:func: ``trio.run()`` loop to reenter::
458+
459+
- Spawn this thread from :func: ``run_sync_in_thread``. This will
460+
"inject" the current Trio Token into thread local storage and allow
461+
this function to re-enter the same :func: ``trio.run()`` loop.
462+
- Pass a keyword argument, ``trio_token`` specifiying a specific
463+
:func: ``trio.run()`` loop to re-enter. This is the "legacy" way of
464+
re-entering a trio thread and is similar to the old
465+
`BlockingTrioPortal`.
466+
"""
467+
468+
def callback(q, afn, args):
469+
@disable_ki_protection
470+
async def unprotected_afn():
471+
return await afn(*args)
472+
473+
async def await_in_trio_thread_task():
474+
q.put_nowait(await outcome.acapture(unprotected_afn))
475+
476+
trio.hazmat.spawn_system_task(await_in_trio_thread_task, name=afn)
477+
478+
return _run_fn_as_system_task(callback, afn, *args, trio_token=trio_token)
479+
480+
481+
def run_sync(fn, *args, trio_token=None):
482+
"""Run the given sync function in the parent Trio thread, blocking until it
483+
is complete.
484+
485+
Returns:
486+
Whatever ``fn(*args)`` returns.
487+
488+
Returns or raises whatever the given function returns or raises. It
489+
can also raise exceptions of its own:
490+
491+
Raises:
492+
RunFinishedError: if the corresponding call to :func:`trio.run` has
493+
already completed.
494+
Cancelled: if the corresponding call to :func:`trio.run` completes
495+
while ``afn(*args)`` is running, then ``afn`` is likely to raise
496+
:class:`Cancelled`, and this will propagate out into
497+
RuntimeError: if you try calling this from inside the Trio thread,
498+
which would otherwise cause a deadlock.
499+
AttributeError: if run()'s thread local storage does not have a token.
500+
This happens when it was not spawned from trio.run_sync_in_thread.
501+
502+
**Locating a Trio Token**: There are two ways to specify which
503+
:func: ``trio.run()`` loop to reenter::
504+
505+
- Spawn this thread from :func: ``run_sync_in_thread``. This will
506+
"inject" the current Trio Token into thread local storage and allow
507+
this function to re-enter the same :func: ``trio.run()`` loop.
508+
- Pass a keyword argument, ``trio_token`` specifiying a specific
509+
:func: ``trio.run()`` loop to re-enter. This is the "legacy" way of
510+
re-entering a trio thread and is similar to the old
511+
`BlockingTrioPortal`.
512+
"""
513+
514+
def callback(q, fn, args):
515+
@disable_ki_protection
516+
def unprotected_fn():
517+
return fn(*args)
518+
519+
res = outcome.capture(unprotected_fn)
520+
q.put_nowait(res)
521+
522+
return _run_fn_as_system_task(callback, fn, *args, trio_token=trio_token)

trio/from_thread.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""
2+
This namespace represents special functions that can call back into Trio from
3+
an external thread by means of a Trio Token present in Thread Local Storage
4+
"""
5+
6+
from ._threads import (run_sync, run)

trio/tests/test_threads.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from .. import Event, CapacityLimiter, sleep
99
from ..testing import wait_all_tasks_blocked
1010
from .._threads import *
11+
from .._threads import run, run_sync # Not in __all__, must import explicitly
1112

1213
from .._core.tests.test_ki import ki_self
1314
from .._core.tests.tutil import slow
@@ -457,3 +458,88 @@ def bad_start(self):
457458
assert "engines" in str(excinfo.value)
458459

459460
assert limiter.borrowed_tokens == 0
461+
462+
463+
async def test_trio_run_sync_in_thread_token():
464+
# Test that run_sync_in_thread automatically injects the current trio token
465+
# into a spawned thread
466+
467+
def thread_fn():
468+
current_thread = threading.current_thread()
469+
callee_token = getattr(current_thread, 'current_trio_token')
470+
return callee_token
471+
472+
caller_token = _core.current_trio_token()
473+
callee_token = await run_sync_in_thread(thread_fn)
474+
assert callee_token == caller_token
475+
476+
477+
async def test_trio_from_thread_run_sync():
478+
# Test that run_sync_in_thread correctly "hands off" the trio token to
479+
# trio.from_thread.run_sync()
480+
def thread_fn():
481+
start = run_sync(_core.current_time)
482+
end = run_sync(_core.current_time)
483+
return end - start
484+
485+
duration = await run_sync_in_thread(thread_fn)
486+
assert duration > 0
487+
488+
489+
async def test_trio_from_thread_run():
490+
# Test that run_sync_in_thread correctly "hands off" the trio token to
491+
# trio.from_thread.run()
492+
def thread_fn():
493+
start = time.perf_counter()
494+
run(sleep, 0.05)
495+
end = time.perf_counter()
496+
return end - start
497+
498+
duration = await run_sync_in_thread(thread_fn)
499+
assert duration > 0
500+
501+
502+
async def test_trio_from_thread_token():
503+
# Test that run_sync_in_thread and spawned trio.from_thread.run_sync()
504+
# share the same Trio token
505+
def thread_fn():
506+
callee_token = run_sync(_core.current_trio_token)
507+
return callee_token
508+
509+
caller_token = _core.current_trio_token()
510+
callee_token = await run_sync_in_thread(thread_fn)
511+
assert callee_token == caller_token
512+
513+
514+
async def test_trio_from_thread_token_kwarg():
515+
# Test that run_sync_in_thread and spawned trio.from_thread.run_sync() can
516+
# use an explicitly defined token
517+
def thread_fn(token):
518+
callee_token = run_sync(_core.current_trio_token, trio_token=token)
519+
return callee_token
520+
521+
caller_token = _core.current_trio_token()
522+
callee_token = await run_sync_in_thread(thread_fn, caller_token)
523+
assert callee_token == caller_token
524+
525+
526+
async def test_trio_from_thread_both_run():
527+
# Test that trio.from_thread.run() and from_thread.run_sync() can run in
528+
# the same thread together
529+
530+
def thread_fn():
531+
start = run_sync(_core.current_time)
532+
run(sleep, 0.05)
533+
end = run_sync(_core.current_time)
534+
return end - start
535+
536+
duration = await run_sync_in_thread(thread_fn)
537+
assert duration > 0
538+
539+
540+
async def test_trio_from_thread_raw_call():
541+
# Test that a "raw call" to trio.from_thread.run() fails because no token
542+
# has been provided
543+
544+
with pytest.raises(AttributeError):
545+
run_sync(_core.current_time)

0 commit comments

Comments
 (0)