diff --git a/docs/source/history.rst b/docs/source/history.rst index 475ba46043..1bcef84365 100644 --- a/docs/source/history.rst +++ b/docs/source/history.rst @@ -564,7 +564,7 @@ Upcoming breaking changes with warnings (i.e., stuff that in 0.2.0 * We took the opportunity to refactor ``run_in_trio_thread`` and ``await_in_trio_thread`` into the new class - :class:`trio.BlockingTrioPortal` + ``trio.BlockingTrioPortal`` * The hazmat function ``current_call_soon_thread_and_signal_safe`` is being replaced by :class:`trio.hazmat.TrioToken` diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 12c6ca9a75..e6729a1a4c 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1472,8 +1472,8 @@ for working with real, operating-system level, :mod:`threading`\-module-style threads. First, if you're in Trio but need to push some blocking I/O into a thread, there's :func:`run_sync_in_thread`. And if you're in a thread and need -to communicate back with Trio, you can use a -:class:`BlockingTrioPortal`. +to communicate back with Trio, you can use +:func:`trio.from_thread.run` and :func:`trio.from_thread.run_sync`. .. _worker-thread-limiting: @@ -1611,14 +1611,15 @@ Putting blocking I/O into worker threads Getting back into the Trio thread from another thread ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. autoclass:: BlockingTrioPortal - :members: +.. autofunction:: trio.from_thread.run + +.. autofunction:: trio.from_thread.run_sync This will probably be clearer with an example. Here we demonstrate how to spawn a child thread, and then use a :ref:`memory channel ` to send messages between the thread and a Trio task: -.. literalinclude:: reference-core/blocking-trio-portal-example.py +.. literalinclude:: reference-core/from-thread-example.py Exceptions and warnings diff --git a/docs/source/reference-core/blocking-trio-portal-example.py b/docs/source/reference-core/from-thread-example.py similarity index 70% rename from docs/source/reference-core/blocking-trio-portal-example.py rename to docs/source/reference-core/from-thread-example.py index 99200b932f..b8ea8f574a 100644 --- a/docs/source/reference-core/blocking-trio-portal-example.py +++ b/docs/source/reference-core/from-thread-example.py @@ -1,20 +1,21 @@ import trio -def thread_fn(portal, receive_from_trio, send_to_trio): + +def thread_fn(receive_from_trio, send_to_trio): while True: # Since we're in a thread, we can't call methods on Trio - # objects directly -- so we use our portal to call them. + # objects directly -- so we use trio.from_thread to call them. try: - request = portal.run(receive_from_trio.receive) + request = trio.from_thread.run(receive_from_trio.receive) except trio.EndOfChannel: - portal.run(send_to_trio.aclose) + trio.from_thread.run(send_to_trio.aclose) return else: response = request + 1 - portal.run(send_to_trio.send, response) + trio.from_thread.run(send_to_trio.send, response) + async def main(): - portal = trio.BlockingTrioPortal() send_to_thread, receive_from_trio = trio.open_memory_channel(0) send_to_trio, receive_from_thread = trio.open_memory_channel(0) @@ -22,8 +23,7 @@ async def main(): # In a background thread, run: # thread_fn(portal, receive_from_trio, send_to_trio) nursery.start_soon( - trio.run_sync_in_thread, - thread_fn, portal, receive_from_trio, send_to_trio + trio.run_sync_in_thread, thread_fn, receive_from_trio, send_to_trio ) # prints "1" @@ -40,4 +40,5 @@ async def main(): # When we exit the nursery, it waits for the background thread to # exit. + trio.run(main) diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index 3cf9bc2890..6cad847629 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -280,11 +280,11 @@ These transitions are accomplished using two function decorators: function). An example of where you'd use this is in implementing something - like :meth:`trio.BlockingTrioPortal.run`, which uses + like :func:`trio.from_thread.run`, which uses :meth:`TrioToken.run_sync_soon` to get into the Trio thread. :meth:`~TrioToken.run_sync_soon` callbacks are run with :exc:`KeyboardInterrupt` protection enabled, and - :meth:`~trio.BlockingTrioPortal.run` takes advantage of this to safely set up + :func:`trio.from_thread.run` takes advantage of this to safely set up the machinery for sending a response back to the original thread, but then uses :func:`disable_ki_protection` when entering the user-provided function. diff --git a/trio/__init__.py b/trio/__init__.py index 34864e49cc..9d9409bb97 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -32,9 +32,8 @@ Event, CapacityLimiter, Semaphore, Lock, StrictFIFOLock, Condition ) -from ._threads import ( - run_sync_in_thread, current_default_thread_limiter, BlockingTrioPortal -) +from ._threads import (run_sync_in_thread, current_default_thread_limiter) +from ._threads import BlockingTrioPortal as _BlockingTrioPortal from ._highlevel_generic import aclose_forcefully, StapledStream @@ -72,6 +71,7 @@ from . import hazmat from . import socket from . import abc +from . import from_thread # Not imported by default: testing if False: from . import testing @@ -142,6 +142,13 @@ "0.12.0", issue=878, ), + "BlockingTrioPortal": + _deprecate.DeprecatedAttribute( + _BlockingTrioPortal, + "0.12.0", + issue=810, + instead=from_thread, + ), } # Having the public path in .__module__ attributes is important for: @@ -155,6 +162,7 @@ fixup_module_metadata(hazmat.__name__, hazmat.__dict__) fixup_module_metadata(socket.__name__, socket.__dict__) fixup_module_metadata(abc.__name__, abc.__dict__) +fixup_module_metadata(from_thread.__name__, from_thread.__dict__) fixup_module_metadata(__name__ + ".ssl", _deprecated_ssl_reexports.__dict__) fixup_module_metadata( __name__ + ".subprocess", _deprecated_subprocess_reexports.__dict__ diff --git a/trio/_core/_exceptions.py b/trio/_core/_exceptions.py index b8b363af1b..5b150671b7 100644 --- a/trio/_core/_exceptions.py +++ b/trio/_core/_exceptions.py @@ -18,7 +18,7 @@ class TrioInternalError(Exception): class RunFinishedError(RuntimeError): - """Raised by `BlockingTrioPortal.run` and similar functions if the + """Raised by `trio.from_thread.run` and similar functions if the corresponding call to :func:`trio.run` has already finished. """ diff --git a/trio/_threads.py b/trio/_threads.py index 1e3efd526b..d4a064fb32 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -8,7 +8,7 @@ import trio from ._sync import CapacityLimiter -from ._core import enable_ki_protection, disable_ki_protection, RunVar +from ._core import enable_ki_protection, disable_ki_protection, RunVar, TrioToken __all__ = [ "run_sync_in_thread", @@ -16,111 +16,21 @@ "BlockingTrioPortal", ] +# Global due to Threading API, thread local storage for trio token +TOKEN_LOCAL = threading.local() -class BlockingTrioPortal: - """A portal that synchronous threads can reach through to run code in the - Trio thread. - - Most Trio functions can only be called from the Trio thread, which is - sometimes annoying. What if you really need to call a Trio function from a - worker thread? That's where :class:`BlockingTrioPortal` comes in: it's the - rare Trio object whose methods can – in fact, must! – be called from - another thread, and it allows you to call all those other functions. - - There is one complication: it's possible for a single Python program to - contain multiple calls to :func:`trio.run`, either in sequence – like in a - test suite that calls :func:`trio.run` for each test – or simultaneously - in different threads. So how do you control which :func:`trio.run` your - portal opens into? - - The answer is that each :class:`BlockingTrioPortal` object is associated - with one *specific* call to :func:`trio.run`. - - The simplest way to set this up is to instantiate the class with no - arguments inside Trio; this automatically binds it to the context where - you instantiate it:: - - async def some_function(): - portal = trio.BlockingTrioPortal() - await trio.run_sync_in_thread(sync_fn, portal) - - Alternatively, you can pass an explicit :class:`trio.hazmat.TrioToken` to - specify the :func:`trio.run` that you want your portal to connect to. - - """ +class BlockingTrioPortal: def __init__(self, trio_token=None): if trio_token is None: trio_token = trio.hazmat.current_trio_token() self._trio_token = trio_token - # This is the part that runs in the Trio thread - def _run_cb(self, q, afn, args): - @disable_ki_protection - async def unprotected_afn(): - return await afn(*args) - - async def await_in_trio_thread_task(): - q.put_nowait(await outcome.acapture(unprotected_afn)) - - trio.hazmat.spawn_system_task(await_in_trio_thread_task, name=afn) - - # This is the part that runs in the Trio thread - def _run_sync_cb(self, q, fn, args): - @disable_ki_protection - def unprotected_fn(): - return fn(*args) - - res = outcome.capture(unprotected_fn) - q.put_nowait(res) - - def _do_it(self, cb, fn, *args): - try: - trio.hazmat.current_task() - except RuntimeError: - pass - else: - raise RuntimeError( - "this is a blocking function; call it from a thread" - ) - q = stdlib_queue.Queue() - self._trio_token.run_sync_soon(cb, q, fn, args) - return q.get().unwrap() - def run(self, afn, *args): - """Run the given async function in the Trio thread, blocking until it - is complete. - - Returns or raises whatever the given function returns or raises. It - can also raise exceptions of its own: - - Raises: - RunFinishedError: if the corresponding call to :func:`trio.run` has - already completed. - Cancelled: if the corresponding call to :func:`trio.run` completes - while ``afn(*args)`` is running, then ``afn`` is likely to raise - :class:`Cancelled`, and this will propagate out into - RuntimeError: if you try calling this from inside the Trio thread, - which would otherwise cause a deadlock. - - """ - return self._do_it(self._run_cb, afn, *args) + return run(afn, *args, trio_token=self._trio_token) def run_sync(self, fn, *args): - """Run the given synchronous function in the Trio thread, blocking - until it is complete. - - Returns or raises whatever the given function returns or raises. It - can also exceptions of its own: - - Raises: - RunFinishedError: if the corresponding call to :func:`trio.run` has - already completed. - RuntimeError: if you try calling this from inside the Trio thread, - which would otherwise cause a deadlock. - - """ - return self._do_it(self._run_sync_cb, fn, *args) + return run_sync(fn, *args, trio_token=self._trio_token) ################################################################ @@ -269,6 +179,12 @@ async def run_sync_in_thread(sync_fn, *args, cancellable=False, limiter=None): tasks to continue working while ``sync_fn`` runs. This is accomplished by pushing the call to ``sync_fn(*args)`` off into a worker thread. + ``run_sync_in_thread`` also injects the current ``TrioToken`` into the + spawned thread's local storage so that these threads can re-enter the Trio + loop by calling either `trio.from_thread.run` or + `trio.from_thread.run_sync` for async or synchronous functions, + respectively. + Args: sync_fn: An arbitrary synchronous callable. *args: Positional arguments to pass to sync_fn. If you need keyword @@ -369,21 +285,31 @@ def do_release_then_return_result(): # This is the function that runs in the worker thread to do the actual # work and then schedule the call to report_back_in_trio_thread_fn - def worker_thread_fn(): - result = outcome.capture(sync_fn, *args) + # Since this is spawned in a new thread, the trio token needs to be passed + # explicitly to it so it can inject it into thread local storage + def worker_thread_fn(trio_token): + TOKEN_LOCAL.token = trio_token try: - token.run_sync_soon(report_back_in_trio_thread_fn, result) - except trio.RunFinishedError: - # The entire run finished, so our particular task is certainly - # long gone -- it must have cancelled. - pass + result = outcome.capture(sync_fn, *args) + try: + token.run_sync_soon(report_back_in_trio_thread_fn, result) + except trio.RunFinishedError: + # The entire run finished, so our particular task is certainly + # long gone -- it must have cancelled. + pass + finally: + del TOKEN_LOCAL.token await limiter.acquire_on_behalf_of(placeholder) try: # daemon=True because it might get left behind if we cancel, and in # this case shouldn't block process exit. + current_trio_token = trio.hazmat.current_trio_token() thread = threading.Thread( - target=worker_thread_fn, name=name, daemon=True + target=worker_thread_fn, + args=(current_trio_token,), + name=name, + daemon=True ) thread.start() except: @@ -398,3 +324,129 @@ def abort(_): return trio.hazmat.Abort.FAILED return await trio.hazmat.wait_task_rescheduled(abort) + + +def _run_fn_as_system_task(cb, fn, *args, trio_token=None): + """Helper function for from_thread.run and from_thread.run_sync. + + Since this internally uses TrioToken.run_sync_soon, all warnings about + raised exceptions canceling all tasks should be noted. + """ + + if trio_token and not isinstance(trio_token, TrioToken): + raise RuntimeError("Passed kwarg trio_token is not of type TrioToken") + + if not trio_token: + try: + trio_token = TOKEN_LOCAL.token + except AttributeError: + raise RuntimeError( + "this thread wasn't created by Trio, pass kwarg trio_token=..." + ) + + # TODO: This is only necessary for compatibility with BlockingTrioPortal. + # once that is deprecated, this check should no longer be necessary because + # thread local storage (or the absence of) is sufficient to check if trio + # is running in a thread or not. + try: + trio.hazmat.current_task() + except RuntimeError: + pass + else: + raise RuntimeError( + "this is a blocking function; call it from a thread" + ) + + q = stdlib_queue.Queue() + trio_token.run_sync_soon(cb, q, fn, args) + return q.get().unwrap() + + +def run(afn, *args, trio_token=None): + """Run the given async function in the parent Trio thread, blocking until it + is complete. + + Returns: + Whatever ``afn(*args)`` returns. + + Returns or raises whatever the given function returns or raises. It + can also raise exceptions of its own: + + Raises: + RunFinishedError: if the corresponding call to :func:`trio.run` has + already completed. + Cancelled: if the corresponding call to :func:`trio.run` completes + while ``afn(*args)`` is running, then ``afn`` is likely to raise + :exc:`trio.Cancelled`, and this will propagate out into + RuntimeError: if you try calling this from inside the Trio thread, + which would otherwise cause a deadlock. + AttributeError: if run()'s thread local storage does not have a token. + This happens when it was not spawned from trio.run_sync_in_thread. + + **Locating a Trio Token**: There are two ways to specify which + `trio.run` loop to reenter: + + - Spawn this thread from `trio.run_sync_in_thread`. This will + "inject" the current Trio Token into thread local storage and allow + this function to re-enter the same `trio.run` loop. + - Pass a keyword argument, ``trio_token`` specifiying a specific + `trio.run` loop to re-enter. This is the "legacy" way of + re-entering a trio thread and is similar to the old + ``BlockingTrioPortal``. + """ + + def callback(q, afn, args): + @disable_ki_protection + async def unprotected_afn(): + return await afn(*args) + + async def await_in_trio_thread_task(): + q.put_nowait(await outcome.acapture(unprotected_afn)) + + trio.hazmat.spawn_system_task(await_in_trio_thread_task, name=afn) + + return _run_fn_as_system_task(callback, afn, *args, trio_token=trio_token) + + +def run_sync(fn, *args, trio_token=None): + """Run the given sync function in the parent Trio thread, blocking until it + is complete. + + Returns: + Whatever ``fn(*args)`` returns. + + Returns or raises whatever the given function returns or raises. It + can also raise exceptions of its own: + + Raises: + RunFinishedError: if the corresponding call to `trio.run` has + already completed. + Cancelled: if the corresponding call to `trio.run` completes + while ``afn(*args)`` is running, then ``afn`` is likely to raise + :exc:`trio.Cancelled`, and this will propagate out into + RuntimeError: if you try calling this from inside the Trio thread, + which would otherwise cause a deadlock. + AttributeError: if run()'s thread local storage does not have a token. + This happens when it was not spawned from trio.run_sync_in_thread. + + **Locating a Trio Token**: There are two ways to specify which + `trio.run` loop to reenter: + + - Spawn this thread from `trio.run_sync_in_thread`. This will + "inject" the current Trio Token into thread local storage and allow + this function to re-enter the same `trio.run` loop. + - Pass a keyword argument, ``trio_token`` specifiying a specific + `trio.run` loop to re-enter. This is the "legacy" way of + re-entering a trio thread and is similar to the old + ``BlockingTrioPortal``. + """ + + def callback(q, fn, args): + @disable_ki_protection + def unprotected_fn(): + return fn(*args) + + res = outcome.capture(unprotected_fn) + q.put_nowait(res) + + return _run_fn_as_system_task(callback, fn, *args, trio_token=trio_token) diff --git a/trio/from_thread.py b/trio/from_thread.py new file mode 100644 index 0000000000..745bcbeafd --- /dev/null +++ b/trio/from_thread.py @@ -0,0 +1,6 @@ +""" +This namespace represents special functions that can call back into Trio from +an external thread by means of a Trio Token present in Thread Local Storage +""" + +from ._threads import (run_sync, run) diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index 5eb2b94181..306af92c27 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -8,6 +8,7 @@ from .. import Event, CapacityLimiter, sleep from ..testing import wait_all_tasks_blocked from .._threads import * +from .._threads import run, run_sync # Not in __all__, must import explicitly from .._core.tests.test_ki import ki_self from .._core.tests.tutil import slow @@ -16,13 +17,13 @@ async def test_do_in_trio_thread(): trio_thread = threading.current_thread() - async def check_case(do_in_trio_thread, fn, expected): + async def check_case(do_in_trio_thread, fn, expected, trio_token=None): record = [] def threadfn(): try: record.append(("start", threading.current_thread())) - x = do_in_trio_thread(fn, record) + x = do_in_trio_thread(fn, record, trio_token=trio_token) record.append(("got", x)) except BaseException as exc: print(exc) @@ -37,21 +38,21 @@ def threadfn(): ("start", child_thread), ("f", trio_thread), expected ] - portal = BlockingTrioPortal() + token = _core.current_trio_token() def f(record): assert not _core.currently_ki_protected() record.append(("f", threading.current_thread())) return 2 - await check_case(portal.run_sync, f, ("got", 2)) + await check_case(run_sync, f, ("got", 2), trio_token=token) def f(record): assert not _core.currently_ki_protected() record.append(("f", threading.current_thread())) raise ValueError - await check_case(portal.run_sync, f, ("error", ValueError)) + await check_case(run_sync, f, ("error", ValueError), trio_token=token) async def f(record): assert not _core.currently_ki_protected() @@ -59,7 +60,7 @@ async def f(record): record.append(("f", threading.current_thread())) return 3 - await check_case(portal.run, f, ("got", 3)) + await check_case(run, f, ("got", 3), trio_token=token) async def f(record): assert not _core.currently_ki_protected() @@ -67,33 +68,18 @@ async def f(record): record.append(("f", threading.current_thread())) raise KeyError - await check_case(portal.run, f, ("error", KeyError)) + await check_case(run, f, ("error", KeyError), trio_token=token) async def test_do_in_trio_thread_from_trio_thread(): - portal = BlockingTrioPortal() - with pytest.raises(RuntimeError): - portal.run_sync(lambda: None) # pragma: no branch + run_sync(lambda: None) # pragma: no branch async def foo(): # pragma: no cover pass with pytest.raises(RuntimeError): - portal.run(foo) - - -async def test_BlockingTrioPortal_with_explicit_TrioToken(): - token = _core.current_trio_token() - - def worker_thread(token): - with pytest.raises(RuntimeError): - BlockingTrioPortal() - portal = BlockingTrioPortal(token) - return portal.run_sync(threading.current_thread) - - t = await run_sync_in_thread(worker_thread, token) - assert t == threading.current_thread() + run(foo) def test_run_in_trio_thread_ki(): @@ -102,7 +88,7 @@ def test_run_in_trio_thread_ki(): record = set() async def check_run_in_trio_thread(): - portal = BlockingTrioPortal() + token = _core.current_trio_token() def trio_thread_fn(): print("in Trio thread") @@ -120,12 +106,12 @@ async def trio_thread_afn(): def external_thread_fn(): try: print("running") - portal.run_sync(trio_thread_fn) + run_sync(trio_thread_fn, trio_token=token) except KeyboardInterrupt: print("ok1") record.add("ok1") try: - portal.run(trio_thread_afn) + run(trio_thread_afn, trio_token=token) except KeyboardInterrupt: print("ok2") record.add("ok2") @@ -152,15 +138,15 @@ async def trio_fn(): ev.set() await _core.wait_task_rescheduled(lambda _: _core.Abort.SUCCEEDED) - def thread_fn(portal): + def thread_fn(token): try: - portal.run(trio_fn) + run(trio_fn, trio_token=token) except _core.Cancelled: record.append("cancelled") async def main(): - portal = BlockingTrioPortal() - thread = threading.Thread(target=thread_fn, args=(portal,)) + token = _core.current_trio_token() + thread = threading.Thread(target=thread_fn, args=(token,)) thread.start() await ev.wait() assert record == ["sleeping"] @@ -322,11 +308,11 @@ class state: state.running = 0 state.parked = 0 - portal = BlockingTrioPortal() + token = _core.current_trio_token() def thread_fn(cancel_scope): print("thread_fn start") - portal.run_sync(cancel_scope.cancel) + run_sync(cancel_scope.cancel, trio_token=token) with lock: state.ran += 1 state.running += 1 @@ -457,3 +443,112 @@ def bad_start(self): assert "engines" in str(excinfo.value) assert limiter.borrowed_tokens == 0 + + +async def test_trio_run_sync_in_thread_token(): + # Test that run_sync_in_thread automatically injects the current trio token + # into a spawned thread + def thread_fn(): + callee_token = run_sync(_core.current_trio_token) + return callee_token + + caller_token = _core.current_trio_token() + callee_token = await run_sync_in_thread(thread_fn) + assert callee_token == caller_token + + +async def test_trio_from_thread_run_sync(): + # Test that run_sync_in_thread correctly "hands off" the trio token to + # trio.from_thread.run_sync() + def thread_fn(): + trio_time = run_sync(_core.current_time) + return trio_time + + trio_time = await run_sync_in_thread(thread_fn) + assert isinstance(trio_time, float) + + +async def test_trio_from_thread_run(): + # Test that run_sync_in_thread correctly "hands off" the trio token to + # trio.from_thread.run() + record = [] + + async def back_in_trio_fn(): + _core.current_time() # implicitly checks that we're in trio + record.append("back in trio") + + def thread_fn(): + record.append("in thread") + run(back_in_trio_fn) + + await run_sync_in_thread(thread_fn) + assert record == ["in thread", "back in trio"] + + +async def test_trio_from_thread_token(): + # Test that run_sync_in_thread and spawned trio.from_thread.run_sync() + # share the same Trio token + def thread_fn(): + callee_token = run_sync(_core.current_trio_token) + return callee_token + + caller_token = _core.current_trio_token() + callee_token = await run_sync_in_thread(thread_fn) + assert callee_token == caller_token + + +async def test_trio_from_thread_token_kwarg(): + # Test that run_sync_in_thread and spawned trio.from_thread.run_sync() can + # use an explicitly defined token + def thread_fn(token): + callee_token = run_sync(_core.current_trio_token, trio_token=token) + return callee_token + + caller_token = _core.current_trio_token() + callee_token = await run_sync_in_thread(thread_fn, caller_token) + assert callee_token == caller_token + + +async def test_from_thread_no_token(): + # Test that a "raw call" to trio.from_thread.run() fails because no token + # has been provided + + with pytest.raises(RuntimeError): + run_sync(_core.current_time) + + +def test_run_fn_as_system_task_catched_badly_typed_token(): + with pytest.raises(RuntimeError): + run_sync(_core.current_time, trio_token="Not TrioTokentype") + + +async def test_do_in_trio_thread_from_trio_thread_legacy(): + # This check specifically confirms that a RuntimeError will be raised if + # the old BlockingTrIoPortal API calls into a trio loop while already + # running inside of one. + portal = BlockingTrioPortal() + + with pytest.raises(RuntimeError): + portal.run_sync(lambda: None) # pragma: no branch + + async def foo(): # pragma: no cover + pass + + with pytest.raises(RuntimeError): + portal.run(foo) + + +async def test_BlockingTrioPortal_with_explicit_TrioToken(): + # This tests the deprecated BlockingTrioPortal with a token passed in to + # confirm that both methods of making a portal are supported by + # trio.from_thread + token = _core.current_trio_token() + + def worker_thread(token): + with pytest.raises(RuntimeError): + BlockingTrioPortal() + portal = BlockingTrioPortal(token) + return portal.run_sync(threading.current_thread) + + t = await run_sync_in_thread(worker_thread, token) + assert t == threading.current_thread()