Skip to content

Commit 1768aea

Browse files
committed
Add new trio.to_thread module for running worker threads
For consistency with trio.from_thread, and to give us a place for future extensions, like utilities for pushing context managers into threads. See python-triogh-810.
1 parent a9bb934 commit 1768aea

26 files changed

+176
-180
lines changed

docs/source/history.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,7 @@ CPython, or PyPy3 5.9+.
655655
Other changes
656656
~~~~~~~~~~~~~
657657

658-
* :func:`run_sync_in_thread` now has a :ref:`robust mechanism
658+
* ``run_sync_in_worker_thread`` now has a :ref:`robust mechanism
659659
for applying capacity limits to the number of concurrent threads
660660
<worker-thread-limiting>` (`#10
661661
<https://github.com/python-trio/trio/issues/170>`__, `#57

docs/source/reference-core.rst

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,7 +1471,7 @@ In acknowledgment of this reality, Trio provides two useful utilities
14711471
for working with real, operating-system level,
14721472
:mod:`threading`\-module-style threads. First, if you're in Trio but
14731473
need to push some blocking I/O into a thread, there's
1474-
:func:`run_sync_in_thread`. And if you're in a thread and need
1474+
`trio.to_thread.run_sync`. And if you're in a thread and need
14751475
to communicate back with Trio, you can use
14761476
:func:`trio.from_thread.run` and :func:`trio.from_thread.run_sync`.
14771477

@@ -1494,7 +1494,7 @@ are spawned and the system gets overloaded and crashes. Instead, the N
14941494
threads start executing the first N jobs, while the other
14951495
(100,000 - N) jobs sit in a queue and wait their turn. Which is
14961496
generally what you want, and this is how
1497-
:func:`trio.run_sync_in_thread` works by default.
1497+
:func:`trio.to_thread.run_sync` works by default.
14981498

14991499
The downside of this kind of thread pool is that sometimes, you need
15001500
more sophisticated logic for controlling how many threads are run at
@@ -1541,16 +1541,16 @@ re-using threads, but has no admission control policy: if you give it
15411541
responsible for providing the policy to make sure that this doesn't
15421542
happen – but since it *only* has to worry about policy, it can be much
15431543
simpler. In fact, all there is to it is the ``limiter=`` argument
1544-
passed to :func:`run_sync_in_thread`. This defaults to a global
1544+
passed to :func:`trio.to_thread.run_sync`. This defaults to a global
15451545
:class:`CapacityLimiter` object, which gives us the classic fixed-size
15461546
thread pool behavior. (See
1547-
:func:`current_default_thread_limiter`.) But if you want to use
1548-
"separate pools" for type A jobs and type B jobs, then it's just a
1549-
matter of creating two separate :class:`CapacityLimiter` objects and
1550-
passing them in when running these jobs. Or here's an example of
1551-
defining a custom policy that respects the global thread limit, while
1552-
making sure that no individual user can use more than 3 threads at a
1553-
time::
1547+
:func:`trio.to_thread.current_default_thread_limiter`.) But if you
1548+
want to use "separate pools" for type A jobs and type B jobs, then
1549+
it's just a matter of creating two separate :class:`CapacityLimiter`
1550+
objects and passing them in when running these jobs. Or here's an
1551+
example of defining a custom policy that respects the global thread
1552+
limit, while making sure that no individual user can use more than 3
1553+
threads at a time::
15541554

15551555
class CombinedLimiter:
15561556
def __init__(self, first, second):
@@ -1594,19 +1594,24 @@ time::
15941594
return combined_limiter
15951595

15961596

1597-
async def run_in_worker_thread_for_user(user_id, async_fn, *args, **kwargs):
1598-
# *args belong to async_fn; **kwargs belong to run_sync_in_thread
1597+
async def run_sync_in_thread_for_user(user_id, sync_fn, *args):
15991598
kwargs["limiter"] = get_user_limiter(user_id)
1600-
return await trio.run_sync_in_thread(asycn_fn, *args, **kwargs)
1599+
return await trio.to_thread.run_sync(asycn_fn, *args)
16011600

16021601

1602+
.. module:: trio.to_thread
1603+
.. currentmodule:: trio
1604+
16031605
Putting blocking I/O into worker threads
16041606
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
16051607

1606-
.. autofunction:: run_sync_in_thread
1608+
.. autofunction:: trio.to_thread.run_sync
1609+
1610+
.. autofunction:: trio.to_thread.current_default_thread_limiter
16071611

1608-
.. autofunction:: current_default_thread_limiter
16091612

1613+
.. module:: trio.from_thread
1614+
.. currentmodule:: trio
16101615

16111616
Getting back into the Trio thread from another thread
16121617
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -1615,6 +1620,7 @@ Getting back into the Trio thread from another thread
16151620

16161621
.. autofunction:: trio.from_thread.run_sync
16171622

1623+
16181624
This will probably be clearer with an example. Here we demonstrate how
16191625
to spawn a child thread, and then use a :ref:`memory channel
16201626
<channels>` to send messages between the thread and a Trio task:

docs/source/reference-core/from-thread-example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ async def main():
2323
# In a background thread, run:
2424
# thread_fn(portal, receive_from_trio, send_to_trio)
2525
nursery.start_soon(
26-
trio.run_sync_in_thread, thread_fn, receive_from_trio, send_to_trio
26+
trio.to_thread.run_sync, thread_fn, receive_from_trio, send_to_trio
2727
)
2828

2929
# prints "1"

docs/source/reference-hazmat.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,8 +403,8 @@ This logic is a bit convoluted, but accomplishes all of the following:
403403
loop outside of the ``except BlockingIOError:`` block.
404404

405405
These functions can also be useful in other situations. For example,
406-
when :func:`trio.run_sync_in_thread` schedules some work to run
407-
in a worker thread, it blocks until the work is finished (so it's a
406+
when :func:`trio.to_thread.run_sync` schedules some work to run in a
407+
worker thread, it blocks until the work is finished (so it's a
408408
schedule point), but by default it doesn't allow cancellation. So to
409409
make sure that the call always acts as a checkpoint, it calls
410410
:func:`checkpoint_if_cancelled` before starting the thread.

docs/source/reference-io.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ To understand why, you need to know two things.
492492
First, right now no mainstream operating system offers a generic,
493493
reliable, native API for async file or filesystem operations, so we
494494
have to fake it by using threads (specifically,
495-
:func:`run_sync_in_thread`). This is cheap but isn't free: on a
495+
:func:`trio.to_thread.run_sync`). This is cheap but isn't free: on a
496496
typical PC, dispatching to a worker thread adds something like ~100 µs
497497
of overhead to each operation. ("µs" is pronounced "microseconds", and
498498
there are 1,000,000 µs in a second. Note that all the numbers here are

newsfragments/810.removal.rst

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
``run_sync_in_worker_thread`` was too much of a mouthful – now it's
2-
just called `run_sync_in_thread` (though the old name still works with
3-
a deprecation warning, for now). Similarly,
4-
``current_default_worker_thread_limiter`` is becoming
5-
`current_default_thread_limiter`.
1+
``run_sync_in_worker_thread`` has become `trio.in_thread.run_sync`, in
2+
order to make it shorter, and more consistent with the new
3+
``trio.from_thread``. And ``current_default_worker_thread_limiter`` is
4+
now `trio.to_thread.current_default_thread_limiter`. (Of course the
5+
old names still work with a deprecation warning, for now.)

notes-to-self/blocking-read-hack.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ async def kill_it_after_timeout(new_fd):
2929
async with trio.open_nursery() as nursery:
3030
nursery.start_soon(kill_it_after_timeout, new_fd)
3131
try:
32-
data = await trio.run_sync_in_thread(os.read, new_fd, count)
32+
data = await trio.to_thread.run_sync(os.read, new_fd, count)
3333
except OSError as exc:
3434
if cancel_requested and exc.errno == errno.ENOTCONN:
3535
# Call was successfully cancelled. In a real version we'd

notes-to-self/thread-dispatch-bench.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# minimal a fashion as possible.
33
#
44
# This is useful to get a sense of the *lower-bound* cost of
5-
# run_sync_in_thread
5+
# trio.to_thread.run_sync
66

77
import threading
88
from queue import Queue

trio/__init__.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
Event, CapacityLimiter, Semaphore, Lock, StrictFIFOLock, Condition
3333
)
3434

35-
from ._threads import (run_sync_in_thread, current_default_thread_limiter)
3635
from ._threads import BlockingTrioPortal as _BlockingTrioPortal
3736

3837
from ._highlevel_generic import aclose_forcefully, StapledStream
@@ -67,12 +66,14 @@
6766

6867
from ._deprecate import TrioDeprecationWarning
6968

70-
# Imported by default
69+
# Submodules imported by default
7170
from . import hazmat
7271
from . import socket
7372
from . import abc
7473
from . import from_thread
75-
# Not imported by default: testing
74+
from . import to_thread
75+
# Not imported by default, but mentioned here so static analysis tools like
76+
# pylint will know that it exists.
7677
if False:
7778
from . import testing
7879

@@ -104,13 +105,13 @@
104105
),
105106
"run_sync_in_worker_thread":
106107
_deprecate.DeprecatedAttribute(
107-
run_sync_in_thread,
108+
to_thread.run_sync,
108109
"0.12.0",
109110
issue=810,
110111
),
111112
"current_default_worker_thread_limiter":
112113
_deprecate.DeprecatedAttribute(
113-
current_default_thread_limiter,
114+
to_thread.current_default_thread_limiter,
114115
"0.12.0",
115116
issue=810,
116117
),
@@ -163,6 +164,7 @@
163164
fixup_module_metadata(socket.__name__, socket.__dict__)
164165
fixup_module_metadata(abc.__name__, abc.__dict__)
165166
fixup_module_metadata(from_thread.__name__, from_thread.__dict__)
167+
fixup_module_metadata(to_thread.__name__, to_thread.__dict__)
166168
fixup_module_metadata(__name__ + ".ssl", _deprecated_ssl_reexports.__dict__)
167169
fixup_module_metadata(
168170
__name__ + ".subprocess", _deprecated_subprocess_reexports.__dict__

trio/_core/_entry_queue.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,10 @@ class TrioToken:
131131
This object has two uses:
132132
133133
1. It lets you re-enter the Trio run loop from external threads or signal
134-
handlers. This is the low-level primitive that
135-
:func:`trio.run_sync_in_thread` uses to receive results from
136-
worker threads, that :func:`trio.open_signal_receiver` uses to receive
137-
notifications about signals, and so forth.
134+
handlers. This is the low-level primitive that :func:`trio.to_thread`
135+
and `trio.from_thread` use to communicate with worker threads, that
136+
`trio.open_signal_receiver` uses to receive notifications about
137+
signals, and so forth.
138138
139139
2. Each call to :func:`trio.run` has exactly one associated
140140
:class:`TrioToken` object, so you can use it to identify a particular

trio/_core/_traps.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def abort_func(raise_cancel):
111111
At that point there are again two possibilities. You can simply ignore
112112
the cancellation altogether: wait for the operation to complete and
113113
then reschedule and continue as normal. (For example, this is what
114-
:func:`trio.run_sync_in_thread` does if cancellation is disabled.)
114+
:func:`trio.to_thread.run_sync` does if cancellation is disabled.)
115115
The other possibility is that the ``abort_func`` does succeed in
116116
cancelling the operation, but for some reason isn't able to report that
117117
right away. (Example: on Windows, it's possible to request that an

trio/_core/tests/test_run.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
from .tutil import check_sequence_matches, gc_collect_harder
1919
from ... import _core
20-
from ..._threads import run_sync_in_thread
20+
from ..._threads import to_thread_run_sync
2121
from ..._timeouts import sleep, fail_after
2222
from ..._util import aiter_compat
2323
from ...testing import (
@@ -552,7 +552,7 @@ async def test_cancel_scope_repr(mock_clock):
552552
scope.deadline = _core.current_time() + 10
553553
assert "deadline is 10.00 seconds from now" in repr(scope)
554554
# when not in async context, can't get the current time
555-
assert "deadline" not in await run_sync_in_thread(repr, scope)
555+
assert "deadline" not in await to_thread_run_sync(repr, scope)
556556
scope.cancel()
557557
assert "cancelled" in repr(scope)
558558
assert "exited" in repr(scope)

trio/_file_io.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
class AsyncIOWrapper(AsyncResource):
5454
"""A generic :class:`~io.IOBase` wrapper that implements the :term:`asynchronous
5555
file object` interface. Wrapped methods that could block are executed in
56-
:meth:`trio.run_sync_in_thread`.
56+
:meth:`trio.to_thread.run_sync`.
5757
5858
All properties and methods defined in in :mod:`~io` are exposed by this
5959
wrapper, if they exist in the wrapped file object.
@@ -80,7 +80,7 @@ def __getattr__(self, name):
8080
@async_wraps(self.__class__, self._wrapped.__class__, name)
8181
async def wrapper(*args, **kwargs):
8282
func = partial(meth, *args, **kwargs)
83-
return await trio.run_sync_in_thread(func)
83+
return await trio.to_thread.run_sync(func)
8484

8585
# cache the generated method
8686
setattr(self, name, wrapper)
@@ -115,7 +115,7 @@ async def detach(self):
115115
116116
"""
117117

118-
raw = await trio.run_sync_in_thread(self._wrapped.detach)
118+
raw = await trio.to_thread.run_sync(self._wrapped.detach)
119119
return wrap_file(raw)
120120

121121
async def aclose(self):
@@ -128,7 +128,7 @@ async def aclose(self):
128128

129129
# ensure the underling file is closed during cancellation
130130
with trio.CancelScope(shield=True):
131-
await trio.run_sync_in_thread(self._wrapped.close)
131+
await trio.to_thread.run_sync(self._wrapped.close)
132132

133133
await trio.hazmat.checkpoint_if_cancelled()
134134

@@ -165,7 +165,7 @@ async def open_file(
165165
file = fspath(file)
166166

167167
_file = wrap_file(
168-
await trio.run_sync_in_thread(
168+
await trio.to_thread.run_sync(
169169
io.open, file, mode, buffering, encoding, errors, newline, closefd,
170170
opener
171171
)

trio/_path.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def iter_wrapper_factory(cls, meth_name):
5858
async def wrapper(self, *args, **kwargs):
5959
meth = getattr(self._wrapped, meth_name)
6060
func = partial(meth, *args, **kwargs)
61-
items = await trio.run_sync_in_thread(func)
61+
items = await trio.to_thread.run_sync(func)
6262
return (rewrap_path(item) for item in items)
6363

6464
return wrapper
@@ -70,7 +70,7 @@ async def wrapper(self, *args, **kwargs):
7070
args = unwrap_paths(args)
7171
meth = getattr(self._wrapped, meth_name)
7272
func = partial(meth, *args, **kwargs)
73-
value = await trio.run_sync_in_thread(func)
73+
value = await trio.to_thread.run_sync(func)
7474
return rewrap_path(value)
7575

7676
return wrapper
@@ -83,7 +83,7 @@ async def wrapper(cls, *args, **kwargs):
8383
args = unwrap_paths(args)
8484
meth = getattr(cls._wraps, meth_name)
8585
func = partial(meth, *args, **kwargs)
86-
value = await trio.run_sync_in_thread(func)
86+
value = await trio.to_thread.run_sync(func)
8787
return rewrap_path(value)
8888

8989
return wrapper
@@ -145,7 +145,7 @@ def generate_iter(cls, attrs):
145145

146146
class Path(metaclass=AsyncAutoWrapperType):
147147
"""A :class:`pathlib.Path` wrapper that executes blocking methods in
148-
:meth:`trio.run_sync_in_thread`.
148+
:meth:`trio.to_thread.run_sync`.
149149
150150
"""
151151

@@ -185,7 +185,7 @@ async def open(self, *args, **kwargs):
185185
"""
186186

187187
func = partial(self._wrapped.open, *args, **kwargs)
188-
value = await trio.run_sync_in_thread(func)
188+
value = await trio.to_thread.run_sync(func)
189189
return trio.wrap_file(value)
190190

191191

trio/_socket.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import idna as _idna
88

99
import trio
10-
from ._threads import run_sync_in_thread
1110
from ._util import fspath
1211
from . import _core
1312

@@ -179,7 +178,7 @@ def numeric_only_failure(exc):
179178
if hr is not None:
180179
return await hr.getaddrinfo(host, port, family, type, proto, flags)
181180
else:
182-
return await run_sync_in_thread(
181+
return await trio.to_thread.run_sync(
183182
_stdlib_socket.getaddrinfo,
184183
host,
185184
port,
@@ -205,7 +204,7 @@ async def getnameinfo(sockaddr, flags):
205204
if hr is not None:
206205
return await hr.getnameinfo(sockaddr, flags)
207206
else:
208-
return await run_sync_in_thread(
207+
return await trio.to_thread.run_sync(
209208
_stdlib_socket.getnameinfo, sockaddr, flags, cancellable=True
210209
)
211210

@@ -216,7 +215,7 @@ async def getprotobyname(name):
216215
Like :func:`socket.getprotobyname`, but async.
217216
218217
"""
219-
return await run_sync_in_thread(
218+
return await trio.to_thread.run_sync(
220219
_stdlib_socket.getprotobyname, name, cancellable=True
221220
)
222221

@@ -464,7 +463,7 @@ async def bind(self, address):
464463
):
465464
# Use a thread for the filesystem traversal (unless it's an
466465
# abstract domain socket)
467-
return await run_sync_in_thread(self._sock.bind, address)
466+
return await trio.to_thread.run_sync(self._sock.bind, address)
468467
else:
469468
# POSIX actually says that bind can return EWOULDBLOCK and
470469
# complete asynchronously, like connect. But in practice AFAICT

trio/_subprocess_platform/waitid.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from .. import _core, _subprocess
77
from .._sync import CapacityLimiter, Event
8-
from .._threads import run_sync_in_thread
8+
from .._threads import to_thread_run_sync
99

1010
try:
1111
from os import waitid
@@ -74,7 +74,7 @@ async def _waitid_system_task(pid: int, event: Event) -> None:
7474
# call to trio.run is shutting down.
7575

7676
try:
77-
await run_sync_in_thread(
77+
await to_thread_run_sync(
7878
sync_wait_reapable,
7979
pid,
8080
cancellable=True,

0 commit comments

Comments
 (0)