diff --git a/ci.sh b/ci.sh index f414d94a2c..08f51ecb5b 100755 --- a/ci.sh +++ b/ci.sh @@ -46,6 +46,7 @@ python -m uv pip install build python -m build wheel_package=$(ls dist/*.whl) python -m uv pip install "trio @ $wheel_package" -c test-requirements.txt +python -m uv pip install https://github.com/python-trio/outcome/archive/e0f317813a499f1a3629b37c3b8caed72825d9c0.zip # Actual tests # expands to 0 != 1 if NO_TEST_REQUIREMENTS is not set, if set the `-0` has no effect diff --git a/newsfragments/3229.bugfix.rst b/newsfragments/3229.bugfix.rst new file mode 100644 index 0000000000..28944b1a5f --- /dev/null +++ b/newsfragments/3229.bugfix.rst @@ -0,0 +1 @@ +Avoid holding refs to result/exception from ``trio.to_thread.run_sync``. diff --git a/src/trio/_core/_entry_queue.py b/src/trio/_core/_entry_queue.py index 988b45ca00..e0b77959b5 100644 --- a/src/trio/_core/_entry_queue.py +++ b/src/trio/_core/_entry_queue.py @@ -83,6 +83,8 @@ async def kill_everything( # noqa: RUF029 # await not used "Internal error: `parent_nursery` should never be `None`", ) from exc # pragma: no cover parent_nursery.start_soon(kill_everything, exc) + finally: + del sync_fn, args, job # This has to be carefully written to be safe in the face of new items # being queued while we iterate, and to do a bounded amount of work on diff --git a/src/trio/_core/_tests/test_run.py b/src/trio/_core/_tests/test_run.py index 7728a6f3d4..9c6237d55d 100644 --- a/src/trio/_core/_tests/test_run.py +++ b/src/trio/_core/_tests/test_run.py @@ -33,6 +33,7 @@ create_asyncio_future_in_new_loop, gc_collect_harder, ignore_coroutine_never_awaited_warnings, + no_other_refs, restore_unraisablehook, slow, ) @@ -2802,25 +2803,10 @@ async def spawn_tasks_in_old_nursery(task_status: _core.TaskStatus[None]) -> Non assert RaisesGroup(ValueError, ValueError).matches(excinfo.value.__cause__) -if sys.version_info >= (3, 11): - - def no_other_refs() -> list[object]: - return [] - -else: - - def no_other_refs() -> list[object]: - return [sys._getframe(1)] - - @pytest.mark.skipif( sys.implementation.name != "cpython", reason="Only makes sense with refcounting GC", ) -@pytest.mark.xfail( - sys.version_info >= (3, 14), - reason="https://github.com/python/cpython/issues/125603", -) async def test_ki_protection_doesnt_leave_cyclic_garbage() -> None: class MyException(Exception): pass diff --git a/src/trio/_core/_tests/tutil.py b/src/trio/_core/_tests/tutil.py index 063fa1dd80..80e8197ab5 100644 --- a/src/trio/_core/_tests/tutil.py +++ b/src/trio/_core/_tests/tutil.py @@ -115,3 +115,20 @@ def check_sequence_matches(seq: Sequence[T], template: Iterable[T | set[T]]) -> def create_asyncio_future_in_new_loop() -> asyncio.Future[object]: with closing(asyncio.new_event_loop()) as loop: return loop.create_future() + + +if sys.version_info >= (3, 14): + + def no_other_refs() -> list[object]: + gen = sys._getframe(1).f_generator + return [] if gen is None else [gen] + +elif sys.version_info >= (3, 11): + + def no_other_refs() -> list[object]: + return [] + +else: + + def no_other_refs() -> list[object]: + return [sys._getframe(1)] diff --git a/src/trio/_tests/test_threads.py b/src/trio/_tests/test_threads.py index 380da3833b..f05f2be9fc 100644 --- a/src/trio/_tests/test_threads.py +++ b/src/trio/_tests/test_threads.py @@ -1,6 +1,7 @@ from __future__ import annotations import contextvars +import gc import queue as stdlib_queue import re import sys @@ -29,7 +30,7 @@ sleep_forever, ) from .._core._tests.test_ki import ki_self -from .._core._tests.tutil import slow +from .._core._tests.tutil import no_other_refs, slow from .._threads import ( active_thread_count, current_default_thread_limiter, @@ -1141,3 +1142,55 @@ async def wait_no_threads_left() -> None: async def test_wait_all_threads_completed_no_threads() -> None: await wait_all_threads_completed() assert active_thread_count() == 0 + + +@pytest.mark.skipif( + sys.implementation.name == "pypy", + reason=( + "gc.get_referrers is broken on PyPy (see " + "https://github.com/pypy/pypy/issues/5075)" + ), +) +async def test_run_sync_worker_references() -> None: + class Foo: + pass + + def foo(_: Foo) -> Foo: + return Foo() + + cvar = contextvars.ContextVar[Foo]("cvar") + contextval = Foo() + arg = Foo() + cvar.set(contextval) + v = await to_thread_run_sync(foo, arg) + + cvar.set(Foo()) + + assert gc.get_referrers(contextval) == no_other_refs() + assert gc.get_referrers(foo) == no_other_refs() + assert gc.get_referrers(arg) == no_other_refs() + assert gc.get_referrers(v) == no_other_refs() + + +@pytest.mark.skipif( + sys.implementation.name == "pypy", + reason=( + "gc.get_referrers is broken on PyPy (see " + "https://github.com/pypy/pypy/issues/5075)" + ), +) +async def test_run_sync_workerreferences_exc() -> None: + + class MyException(Exception): + pass + + def throw() -> None: + raise MyException + + e = None + try: + await to_thread_run_sync(throw) + except MyException as err: + e = err + + assert gc.get_referrers(e) == no_other_refs() diff --git a/src/trio/_threads.py b/src/trio/_threads.py index 394e5b06ac..1894799ec7 100644 --- a/src/trio/_threads.py +++ b/src/trio/_threads.py @@ -443,17 +443,19 @@ def abort(raise_cancel: RaiseCancelT) -> trio.lowlevel.Abort: msg_from_thread: outcome.Outcome[RetT] | Run[object] | RunSync[object] = ( await trio.lowlevel.wait_task_rescheduled(abort) ) - if isinstance(msg_from_thread, outcome.Outcome): - return msg_from_thread.unwrap() - elif isinstance(msg_from_thread, Run): - await msg_from_thread.run() - elif isinstance(msg_from_thread, RunSync): - msg_from_thread.run_sync() - else: # pragma: no cover, internal debugging guard TODO: use assert_never - raise TypeError( - f"trio.to_thread.run_sync received unrecognized thread message {msg_from_thread!r}.", - ) - del msg_from_thread + try: + if isinstance(msg_from_thread, outcome.Outcome): + return msg_from_thread.unwrap() + elif isinstance(msg_from_thread, Run): + await msg_from_thread.run() + elif isinstance(msg_from_thread, RunSync): + msg_from_thread.run_sync() + else: # pragma: no cover, internal debugging guard TODO: use assert_never + raise TypeError( + f"trio.to_thread.run_sync received unrecognized thread message {msg_from_thread!r}.", + ) + finally: + del msg_from_thread def from_thread_check_cancelled() -> None: