-
Notifications
You must be signed in to change notification settings - Fork 105
Make Process joining less likely to block #218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There are two things to consider here: performance and cancellability. Suppose we have two ways to wait for something:
Obviously, we cannot use blocking calls directly in an asynchronous environment, because otherwise they will block the event loop. Therefore, they are usually run in separate threads: await asyncio.to_thread(blocking_call, timeout) At first glance, everything works fine, and the wait is as fast as possible (not counting the overhead due to multithreading). But there are two nuances here:
>>> import asyncio
>>> import os
>>> import sys
>>> import threading
>>> if sys.version_info >= (3, 13):
... max_workers = min(32, (os.process_cpu_count() or 1) + 4)
... else:
... max_workers = min(32, os.cpu_count() + 4)
...
>>> max_workers
8
>>> threading.active_count()
2 # = initial_count
>>> event = threading.Event()
>>> for _ in range(max_workers):
... try:
... await asyncio.wait_for(asyncio.to_thread(event.wait, 60), 1 / max_workers)
... except TimeoutError:
... pass
...
>>> threading.active_count()
10 # == 2 + 8 == initial_count + max_workers
>>> await asyncio.to_thread(print, "ok") # hangs! Now let us look at polling. It usually looks something like this: while not done:
await asyncio.sleep(delay) It has no cancellability issues, since
>>> import asyncio
>>> import random
>>> import time
>>> delay = 0.005
>>> iterations = 100
>>> real_ops = 10_000
>>> ops = [None] * iterations
>>> for i in range(iterations):
... task = asyncio.create_task(asyncio.sleep((1 / real_ops) * random.random()))
... await asyncio.sleep(0) # switch to the task
... start = time.perf_counter()
... while not task.done():
... await asyncio.sleep(delay)
... ops[i] = 1 / (time.perf_counter() - start)
...
>>> ops[len(ops) // 2]
185.88300231336297 # ~= (1 / 0.005) < 10_000 The most viable option is to combine both ways: try:
if not done:
async with asyncio.timeout(timeout):
while not done:
done = await asyncio.to_thread(blocking_call, delay)
except TimeoutError:
# done is False
else:
# done is True Key benefits:
However, even this approach still has problems. In particular, we still have the limit on the maximum number of worker threads. Because of this, we cannot actually wait for all processes at the same time, and the dependence of the waiting time on the number of processes will therefore be O(n) multiplied by delay. This is almost the same as if we were polling all processes manually in a worker thread loop. We can rely on communication between processes via sockets, which is purely asynchronous (we can work with pipes and files in a truly asynchronous manner using APIs such as io_uring (Linux) or I/O rings (Windows 11), but unfortunately, they are still poorly supported, possibly due to security issues), but what about zombie processes and other nasties? We can also try to rely on the fact that as soon as a process is dead, the operating system closes the file descriptors (and thus sockets) opened by it, but how reliable is that? If we take the combined approach as a basis, it can be improved by polling all those not in worker threads once every In short, I believe that it is virtually impossible to solve the problem using threads and/or polling, unless we create a thread for each call/process (slow and heavy): async def longpoll_in_thread(blocking_call, delay):
loop = asyncio.get_running_loop()
future = loop.create_future()
# Note: exceptions from blocking_call are actually suppressed after the
# task is cancelled or the event loop is closed. Ideally, they should be
# handled (especially if it is KeyboardInterrupt) or at least logged, but
# this will also complicate the code.
def set_result(future, result):
try:
future.set_result(result)
except asyncio.InvalidStateError: # task is cancelled
pass
def set_exception(future, exception):
try:
future.set_exception(exception)
except asyncio.InvalidStateError: # task is cancelled
pass
def longpoll(future):
while not future.cancelled():
try:
result = blocking_call(delay)
except BaseException as exc:
try:
loop.call_soon_threadsafe(set_exception, future, exc)
except RuntimeError: # event loop is closed
break
finally:
del future
else:
if result:
try:
loop.call_soon_threadsafe(set_result, future, result)
except RuntimeError: # event loop is closed
break
else:
break
# Note: it would be much better to wait for the thread to complete with
# shielding from cancellation, but the correct implementation of shielding
# in asyncio is too complicated, and in this case it would be more correct
# to give an example with aiologic.
threading.Thread(target=longpoll, args=[future], daemon=True).start()
return await future
try:
if not done:
done = await asyncio.wait_for(
longpoll_in_thread(blocking_call, delay),
timeout,
)
except TimeoutError:
# done is False
else:
# done is True For the last two examples, |
@x42005e1f I see what your saying. I had an idea so that python versions that still do not have asyncio.timeout(...) can have backwards compatability async def join(self, timeout: Optional[int] = None) -> None:
"""Wait for the process to finish execution without blocking the main thread."""
if not self.is_alive() and self.exitcode is None:
raise ValueError("must start process before joining it")
# XXX: some versions of python do not come with asyncio.timeout() so this is kept as backwards comptability
if timeout:
return await asyncio.wait_for(self.join, timeout)
while self.exitcode is None:
await asyncio.to_thread(self.aio_process.join, 0.005) |
There is also a backport: graingert/taskgroup. But yes, it is redundant when the method can be called recursively. |
if timeout is not None: | ||
return await asyncio.wait_for(self.join(), timeout) | ||
|
||
if timeout: | ||
return await asyncio.wait_for(self.join, timeout) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes seem strange. asyncio.wait_for()
expects an awaitable object, not a function. Also, this code will not work correctly for timeout=0
. It is better to leave it unchanged here.
Description
In an attempt to make the asynchronous portions of this library less reliant on polling I decided to try seeing if there were other more fine tuned ways to go about checking to see if a process was closed. I plan to try making it so that a callback could be hooked in the future or see if there are any ways to hook in a listener for a pid to be closed but here is the best I could come up with. I'll try to see about finding other alternative solutions to more await asyncio.sleep(0.005) lines of code soon but this is a start with what I had in mind.
I may change this so threat it for right now as a draft or a DNM (Do not merge)Edit: it's ready