Skip to content

Conversation

Vizonex
Copy link

@Vizonex Vizonex commented Oct 17, 2025

Description

In an attempt to make the asynchronous portions of this library less reliant on polling I decided to try seeing if there were other more fine tuned ways to go about checking to see if a process was closed. I plan to try making it so that a callback could be hooked in the future or see if there are any ways to hook in a listener for a pid to be closed but here is the best I could come up with. I'll try to see about finding other alternative solutions to more await asyncio.sleep(0.005) lines of code soon but this is a start with what I had in mind.

I may change this so threat it for right now as a draft or a DNM (Do not merge)

Edit: it's ready

@x42005e1f
Copy link

x42005e1f commented Oct 18, 2025

There are two things to consider here: performance and cancellability.

Suppose we have two ways to wait for something:

  1. Use blocking_call(timeout).
  2. Poll via a Boolean flag done + sleep(delay).

Obviously, we cannot use blocking calls directly in an asynchronous environment, because otherwise they will block the event loop. Therefore, they are usually run in separate threads:

await asyncio.to_thread(blocking_call, timeout)

At first glance, everything works fine, and the wait is as fast as possible (not counting the overhead due to multithreading). But there are two nuances here:

  1. Threads cannot be killed (except for some dirty and platform-specific tricks), which means that the blocking call cannot actually be cancelled. This leads to thread leaks.
  2. When no separate executor is used, the default one created by the event loop is used. And, of course, it has a limit on the number of worker threads - once that limit is reached, no further blocking calls will even be passed to any thread, which leads to hangs.
>>> import asyncio
>>> import os
>>> import sys
>>> import threading
>>> if sys.version_info >= (3, 13):
...     max_workers = min(32, (os.process_cpu_count() or 1) + 4)
... else:
...     max_workers = min(32, os.cpu_count() + 4)
...     
>>> max_workers
8
>>> threading.active_count()
2  # = initial_count
>>> event = threading.Event()
>>> for _ in range(max_workers):
...     try:
...         await asyncio.wait_for(asyncio.to_thread(event.wait, 60), 1 / max_workers)
...     except TimeoutError:
...         pass
...         
>>> threading.active_count()
10  # == 2 + 8 == initial_count + max_workers
>>> await asyncio.to_thread(print, "ok")  # hangs!

Now let us look at polling. It usually looks something like this:

while not done:
    await asyncio.sleep(delay)

It has no cancellability issues, since asyncio.sleep(delay) is a purely asynchronous call. However, its performance suffers:

  1. When choosing delay, we are looking for a compromise between processor load and response speed. If the delay is too low, we will just waste processor cycles. But if it is too high, we will waste time.
  2. The delay affects the number of operations per second. Even if we have a modern system capable of performing thousands process launches per second, our average number of operations per second will usually be limited by that delay (for 0.005, it is only 200 operations per second!).
>>> import asyncio
>>> import random
>>> import time
>>> delay = 0.005
>>> iterations = 100
>>> real_ops = 10_000
>>> ops = [None] * iterations
>>> for i in range(iterations):
...     task = asyncio.create_task(asyncio.sleep((1 / real_ops) * random.random()))
...     await asyncio.sleep(0)  # switch to the task
...     start = time.perf_counter()
...     while not task.done():
...         await asyncio.sleep(delay)
...     ops[i] = 1 / (time.perf_counter() - start)
...     
>>> ops[len(ops) // 2]
185.88300231336297  # ~= (1 / 0.005) < 10_000

The most viable option is to combine both ways:

try:
    if not done:
        async with asyncio.timeout(timeout):
            while not done:
                done = await asyncio.to_thread(blocking_call, delay)
except TimeoutError:
    # done is False
else:
    # done is True

Key benefits:

  1. If there is no need to wait (done = self.exitcode is not None and this is True), neither threads nor anything else are used, which is very cheap and fast.
  2. It responds well to cancellation if a sufficiently low delay is chosen, which mitigates the cancellability issue.
  3. The number of operations per second is not limited by delay (but suffers from multithreading overhead).

However, even this approach still has problems. In particular, we still have the limit on the maximum number of worker threads. Because of this, we cannot actually wait for all processes at the same time, and the dependence of the waiting time on the number of processes will therefore be O(n) multiplied by delay. This is almost the same as if we were polling all processes manually in a worker thread loop.

We can rely on communication between processes via sockets, which is purely asynchronous (we can work with pipes and files in a truly asynchronous manner using APIs such as io_uring (Linux) or I/O rings (Windows 11), but unfortunately, they are still poorly supported, possibly due to security issues), but what about zombie processes and other nasties? We can also try to rely on the fact that as soon as a process is dead, the operating system closes the file descriptors (and thus sockets) opened by it, but how reliable is that?

If we take the combined approach as a basis, it can be improved by polling all those not in worker threads once every delay seconds (via self.exitcode or something else non-blocking). This way, we can avoid the performance drop caused by the call queue, but it is much more difficult to implement and still far from ideal (we are just reducing the worst case to the same polling).

In short, I believe that it is virtually impossible to solve the problem using threads and/or polling, unless we create a thread for each call/process (slow and heavy):

async def longpoll_in_thread(blocking_call, delay):
    loop = asyncio.get_running_loop()

    future = loop.create_future()

    # Note: exceptions from blocking_call are actually suppressed after the
    # task is cancelled or the event loop is closed. Ideally, they should be
    # handled (especially if it is KeyboardInterrupt) or at least logged, but
    # this will also complicate the code.

    def set_result(future, result):
        try:
            future.set_result(result)
        except asyncio.InvalidStateError:  # task is cancelled
            pass

    def set_exception(future, exception):
        try:
            future.set_exception(exception)
        except asyncio.InvalidStateError:  # task is cancelled
            pass

    def longpoll(future):
        while not future.cancelled():
            try:
                result = blocking_call(delay)
            except BaseException as exc:
                try:
                    loop.call_soon_threadsafe(set_exception, future, exc)
                except RuntimeError:  # event loop is closed
                    break
                finally:
                    del future
            else:
                if result:
                    try:
                        loop.call_soon_threadsafe(set_result, future, result)
                    except RuntimeError:  # event loop is closed
                        break
                    else:
                        break

    # Note: it would be much better to wait for the thread to complete with
    # shielding from cancellation, but the correct implementation of shielding
    # in asyncio is too complicated, and in this case it would be more correct
    # to give an example with aiologic.

    threading.Thread(target=longpoll, args=[future], daemon=True).start()

    return await future

try:
    if not done:
        done = await asyncio.wait_for(
            longpoll_in_thread(blocking_call, delay),
            timeout,
        )
except TimeoutError:
    # done is False
else:
    # done is True

For the last two examples, delay determines the response speed to cancellation. Since it only effectively affects the thread's shutdown time, it makes sense to make it large enough so that waiting for a large number of processes requires fewer resources. Unless, of course, waiting for the thread to complete is implemented.

@Vizonex
Copy link
Author

Vizonex commented Oct 19, 2025

@x42005e1f I see what your saying. I had an idea so that python versions that still do not have asyncio.timeout(...) can have backwards compatability

async def join(self, timeout: Optional[int] = None) -> None:
        """Wait for the process to finish execution without blocking the main thread."""
        if not self.is_alive() and self.exitcode is None:
            raise ValueError("must start process before joining it")
        
        # XXX: some versions of python do not come with asyncio.timeout() so this is kept as backwards comptability
        if timeout:
            return await asyncio.wait_for(self.join, timeout)
        
        while self.exitcode is None:
            await asyncio.to_thread(self.aio_process.join, 0.005)

@Vizonex Vizonex marked this pull request as ready for review October 19, 2025 01:34
@x42005e1f
Copy link

I had an idea so that python versions that still do not have asyncio.timeout(...) can have backwards compatability

There is also a backport: graingert/taskgroup. But yes, it is redundant when the method can be called recursively.

Comment on lines -163 to +164
if timeout is not None:
return await asyncio.wait_for(self.join(), timeout)

if timeout:
return await asyncio.wait_for(self.join, timeout)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes seem strange. asyncio.wait_for() expects an awaitable object, not a function. Also, this code will not work correctly for timeout=0. It is better to leave it unchanged here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants