Skip to content

Commit 2a66a0d

Browse files
authored
Merge pull request #3081 from jakkdl/break_the_lot
Add ability to break parking lots, stop locks from stalling
2 parents d0158fa + 92f9799 commit 2a66a0d

File tree

10 files changed

+357
-8
lines changed

10 files changed

+357
-8
lines changed

docs/source/reference-lowlevel.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,10 @@ Wait queue abstraction
393393
.. autoclass:: ParkingLotStatistics
394394
:members:
395395

396+
.. autofunction:: add_parking_lot_breaker
397+
398+
.. autofunction:: remove_parking_lot_breaker
399+
396400
Low-level checkpoint functions
397401
------------------------------
398402

newsfragments/3035.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:class:`trio.Lock` and :class:`trio.StrictFIFOLock` will now raise :exc:`trio.BrokenResourceError` when :meth:`trio.Lock.acquire` would previously stall due to the owner of the lock exiting without releasing the lock.

newsfragments/3081.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added :func:`trio.lowlevel.add_parking_lot_breaker` and :func:`trio.lowlevel.remove_parking_lot_breaker` to allow creating custom lock/semaphore implementations that will break their underlying parking lot if a task exits unexpectedly. :meth:`trio.lowlevel.ParkingLot.break_lot` is also added, to allow breaking a parking lot intentionally.

src/trio/_core/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@
2020
from ._ki import currently_ki_protected, disable_ki_protection, enable_ki_protection
2121
from ._local import RunVar, RunVarToken
2222
from ._mock_clock import MockClock
23-
from ._parking_lot import ParkingLot, ParkingLotStatistics
23+
from ._parking_lot import (
24+
ParkingLot,
25+
ParkingLotStatistics,
26+
add_parking_lot_breaker,
27+
remove_parking_lot_breaker,
28+
)
2429

2530
# Imports that always exist
2631
from ._run import (

src/trio/_core/_parking_lot.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,13 @@
7171
# See: https://github.com/python-trio/trio/issues/53
7272
from __future__ import annotations
7373

74+
import inspect
7475
import math
7576
from collections import OrderedDict
7677
from typing import TYPE_CHECKING
7778

7879
import attrs
80+
import outcome
7981

8082
from .. import _core
8183
from .._util import final
@@ -86,6 +88,37 @@
8688
from ._run import Task
8789

8890

91+
GLOBAL_PARKING_LOT_BREAKER: dict[Task, list[ParkingLot]] = {}
92+
93+
94+
def add_parking_lot_breaker(task: Task, lot: ParkingLot) -> None:
95+
"""Register a task as a breaker for a lot. See :meth:`ParkingLot.break_lot`.
96+
97+
raises:
98+
trio.BrokenResourceError: if the task has already exited.
99+
"""
100+
if inspect.getcoroutinestate(task.coro) == inspect.CORO_CLOSED:
101+
raise _core._exceptions.BrokenResourceError(
102+
"Attempted to add already exited task as lot breaker.",
103+
)
104+
if task not in GLOBAL_PARKING_LOT_BREAKER:
105+
GLOBAL_PARKING_LOT_BREAKER[task] = [lot]
106+
else:
107+
GLOBAL_PARKING_LOT_BREAKER[task].append(lot)
108+
109+
110+
def remove_parking_lot_breaker(task: Task, lot: ParkingLot) -> None:
111+
"""Deregister a task as a breaker for a lot. See :meth:`ParkingLot.break_lot`"""
112+
try:
113+
GLOBAL_PARKING_LOT_BREAKER[task].remove(lot)
114+
except (KeyError, ValueError):
115+
raise RuntimeError(
116+
"Attempted to remove task as breaker for a lot it is not registered for",
117+
) from None
118+
if not GLOBAL_PARKING_LOT_BREAKER[task]:
119+
del GLOBAL_PARKING_LOT_BREAKER[task]
120+
121+
89122
@attrs.frozen
90123
class ParkingLotStatistics:
91124
"""An object containing debugging information for a ParkingLot.
@@ -118,6 +151,7 @@ class ParkingLot:
118151
# {task: None}, we just want a deque where we can quickly delete random
119152
# items
120153
_parked: OrderedDict[Task, None] = attrs.field(factory=OrderedDict, init=False)
154+
broken_by: list[Task] = attrs.field(factory=list, init=False)
121155

122156
def __len__(self) -> int:
123157
"""Returns the number of parked tasks."""
@@ -136,7 +170,15 @@ async def park(self) -> None:
136170
"""Park the current task until woken by a call to :meth:`unpark` or
137171
:meth:`unpark_all`.
138172
173+
Raises:
174+
BrokenResourceError: if attempting to park in a broken lot, or the lot
175+
breaks before we get to unpark.
176+
139177
"""
178+
if self.broken_by:
179+
raise _core.BrokenResourceError(
180+
f"Attempted to park in parking lot broken by {self.broken_by}",
181+
)
140182
task = _core.current_task()
141183
self._parked[task] = None
142184
task.custom_sleep_data = self
@@ -234,6 +276,35 @@ def repark_all(self, new_lot: ParkingLot) -> None:
234276
"""
235277
return self.repark(new_lot, count=len(self))
236278

279+
def break_lot(self, task: Task | None = None) -> None:
280+
"""Break this lot, with ``task`` noted as the task that broke it.
281+
282+
This causes all parked tasks to raise an error, and any
283+
future tasks attempting to park to error. Unpark & repark become no-ops as the
284+
parking lot is empty.
285+
286+
The error raised contains a reference to the task sent as a parameter. The task
287+
is also saved in the parking lot in the ``broken_by`` attribute.
288+
"""
289+
if task is None:
290+
task = _core.current_task()
291+
292+
# if lot is already broken, just mark this as another breaker and return
293+
if self.broken_by:
294+
self.broken_by.append(task)
295+
return
296+
297+
self.broken_by.append(task)
298+
299+
for parked_task in self._parked:
300+
_core.reschedule(
301+
parked_task,
302+
outcome.Error(
303+
_core.BrokenResourceError(f"Parking lot broken by {task}"),
304+
),
305+
)
306+
self._parked.clear()
307+
237308
def statistics(self) -> ParkingLotStatistics:
238309
"""Return an object containing debugging information.
239310

src/trio/_core/_run.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from ._exceptions import Cancelled, RunFinishedError, TrioInternalError
4141
from ._instrumentation import Instruments
4242
from ._ki import LOCALS_KEY_KI_PROTECTION_ENABLED, KIManager, enable_ki_protection
43+
from ._parking_lot import GLOBAL_PARKING_LOT_BREAKER
4344
from ._thread_cache import start_thread_soon
4445
from ._traps import (
4546
Abort,
@@ -1896,6 +1897,12 @@ async def python_wrapper(orig_coro: Awaitable[RetT]) -> RetT:
18961897
return task
18971898

18981899
def task_exited(self, task: Task, outcome: Outcome[Any]) -> None:
1900+
# break parking lots associated with the exiting task
1901+
if task in GLOBAL_PARKING_LOT_BREAKER:
1902+
for lot in GLOBAL_PARKING_LOT_BREAKER[task]:
1903+
lot.break_lot(task)
1904+
del GLOBAL_PARKING_LOT_BREAKER[task]
1905+
18991906
if (
19001907
task._cancel_status is not None
19011908
and task._cancel_status.abandoned_by_misnesting

src/trio/_core/_tests/test_parking_lot.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
11
from __future__ import annotations
22

3+
import re
34
from typing import TypeVar
45

56
import pytest
67

8+
import trio
9+
from trio.lowlevel import (
10+
add_parking_lot_breaker,
11+
current_task,
12+
remove_parking_lot_breaker,
13+
)
14+
from trio.testing import Matcher, RaisesGroup
15+
716
from ... import _core
817
from ...testing import wait_all_tasks_blocked
918
from .._parking_lot import ParkingLot
@@ -215,3 +224,161 @@ async def test_parking_lot_repark_with_count() -> None:
215224
"wake 2",
216225
]
217226
lot1.unpark_all()
227+
228+
229+
async def dummy_task(
230+
task_status: _core.TaskStatus[_core.Task] = trio.TASK_STATUS_IGNORED,
231+
) -> None:
232+
task_status.started(_core.current_task())
233+
await trio.sleep_forever()
234+
235+
236+
async def test_parking_lot_breaker_basic() -> None:
237+
"""Test basic functionality for breaking lots."""
238+
lot = ParkingLot()
239+
task = current_task()
240+
241+
# defaults to current task
242+
lot.break_lot()
243+
assert lot.broken_by == [task]
244+
245+
# breaking the lot again with the same task appends another copy in `broken_by`
246+
lot.break_lot()
247+
assert lot.broken_by == [task, task]
248+
249+
# trying to park in broken lot errors
250+
broken_by_str = re.escape(str([task, task]))
251+
with pytest.raises(
252+
_core.BrokenResourceError,
253+
match=f"^Attempted to park in parking lot broken by {broken_by_str}$",
254+
):
255+
await lot.park()
256+
257+
258+
async def test_parking_lot_break_parking_tasks() -> None:
259+
"""Checks that tasks currently waiting to park raise an error when the breaker exits."""
260+
261+
async def bad_parker(lot: ParkingLot, scope: _core.CancelScope) -> None:
262+
add_parking_lot_breaker(current_task(), lot)
263+
with scope:
264+
await trio.sleep_forever()
265+
266+
lot = ParkingLot()
267+
cs = _core.CancelScope()
268+
269+
# check that parked task errors
270+
with RaisesGroup(
271+
Matcher(_core.BrokenResourceError, match="^Parking lot broken by"),
272+
):
273+
async with _core.open_nursery() as nursery:
274+
nursery.start_soon(bad_parker, lot, cs)
275+
await wait_all_tasks_blocked()
276+
277+
nursery.start_soon(lot.park)
278+
await wait_all_tasks_blocked()
279+
280+
cs.cancel()
281+
282+
283+
async def test_parking_lot_breaker_registration() -> None:
284+
lot = ParkingLot()
285+
task = current_task()
286+
287+
with pytest.raises(
288+
RuntimeError,
289+
match="Attempted to remove task as breaker for a lot it is not registered for",
290+
):
291+
remove_parking_lot_breaker(task, lot)
292+
293+
# check that a task can be registered as breaker for the same lot multiple times
294+
add_parking_lot_breaker(task, lot)
295+
add_parking_lot_breaker(task, lot)
296+
remove_parking_lot_breaker(task, lot)
297+
remove_parking_lot_breaker(task, lot)
298+
299+
with pytest.raises(
300+
RuntimeError,
301+
match="Attempted to remove task as breaker for a lot it is not registered for",
302+
):
303+
remove_parking_lot_breaker(task, lot)
304+
305+
# registering a task as breaker on an already broken lot is fine
306+
lot.break_lot()
307+
child_task = None
308+
async with trio.open_nursery() as nursery:
309+
child_task = await nursery.start(dummy_task)
310+
add_parking_lot_breaker(child_task, lot)
311+
nursery.cancel_scope.cancel()
312+
assert lot.broken_by == [task, child_task]
313+
314+
# manually breaking a lot with an already exited task is fine
315+
lot = ParkingLot()
316+
lot.break_lot(child_task)
317+
assert lot.broken_by == [child_task]
318+
319+
320+
async def test_parking_lot_breaker_rebreak() -> None:
321+
lot = ParkingLot()
322+
task = current_task()
323+
lot.break_lot()
324+
325+
# breaking an already broken lot with a different task is allowed
326+
# The nursery is only to create a task we can pass to lot.break_lot
327+
async with trio.open_nursery() as nursery:
328+
child_task = await nursery.start(dummy_task)
329+
lot.break_lot(child_task)
330+
nursery.cancel_scope.cancel()
331+
332+
assert lot.broken_by == [task, child_task]
333+
334+
335+
async def test_parking_lot_multiple_breakers_exit() -> None:
336+
# register multiple tasks as lot breakers, then have them all exit
337+
lot = ParkingLot()
338+
async with trio.open_nursery() as nursery:
339+
child_task1 = await nursery.start(dummy_task)
340+
child_task2 = await nursery.start(dummy_task)
341+
child_task3 = await nursery.start(dummy_task)
342+
add_parking_lot_breaker(child_task1, lot)
343+
add_parking_lot_breaker(child_task2, lot)
344+
add_parking_lot_breaker(child_task3, lot)
345+
nursery.cancel_scope.cancel()
346+
347+
# I think the order is guaranteed currently, but doesn't hurt to be safe.
348+
assert set(lot.broken_by) == {child_task1, child_task2, child_task3}
349+
350+
351+
async def test_parking_lot_breaker_register_exited_task() -> None:
352+
lot = ParkingLot()
353+
child_task = None
354+
async with trio.open_nursery() as nursery:
355+
child_task = await nursery.start(dummy_task)
356+
nursery.cancel_scope.cancel()
357+
# trying to register an exited task as lot breaker errors
358+
with pytest.raises(
359+
trio.BrokenResourceError,
360+
match="^Attempted to add already exited task as lot breaker.$",
361+
):
362+
add_parking_lot_breaker(child_task, lot)
363+
364+
365+
async def test_parking_lot_break_itself() -> None:
366+
"""Break a parking lot, where the breakee is parked.
367+
Doing this is weird, but should probably be supported.
368+
"""
369+
370+
async def return_me_and_park(
371+
lot: ParkingLot,
372+
*,
373+
task_status: _core.TaskStatus[_core.Task] = trio.TASK_STATUS_IGNORED,
374+
) -> None:
375+
task_status.started(_core.current_task())
376+
await lot.park()
377+
378+
lot = ParkingLot()
379+
with RaisesGroup(
380+
Matcher(_core.BrokenResourceError, match="^Parking lot broken by"),
381+
):
382+
async with _core.open_nursery() as nursery:
383+
child_task = await nursery.start(return_me_and_park, lot)
384+
lot.break_lot(child_task)

0 commit comments

Comments
 (0)