Skip to content

Commit c56a7aa

Browse files
Fix matplotlib eventloops (#1458)
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent c7af34c commit c56a7aa

File tree

5 files changed

+69
-139
lines changed

5 files changed

+69
-139
lines changed

.github/workflows/downstream.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ jobs:
117117
shell: bash -l {0}
118118
run: |
119119
cd ${GITHUB_WORKSPACE}/../qtconsole
120-
xvfb-run --auto-servernum ${pythonLocation}/bin/python -m pytest -x -vv -s --full-trace --color=yes qtconsole
120+
xvfb-run --auto-servernum ${pythonLocation}/bin/python -m pytest -x -vv -s --full-trace --color=yes qtconsole -k "not test_scroll"
121121
122122
spyder_kernels:
123123
runs-on: ubuntu-latest

ipykernel/eventloops.py

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ def register_integration(*toolkitnames):
4040
You can provide alternative names for the same toolkit.
4141
4242
The decorated function should take a single argument, the IPython kernel
43-
instance, arrange for the event loop to call ``kernel.do_one_iteration()``
44-
at least every ``kernel._poll_interval`` seconds, and start the event loop.
43+
instance, arrange for the event loop to yield the asyncio loop when a
44+
message is received by the main shell zmq stream or at least every
45+
``kernel._poll_interval`` seconds, and start the event loop.
4546
4647
:mod:`ipykernel.eventloops` provides and registers such functions
4748
for a few common event loops.
@@ -68,6 +69,15 @@ def exit_decorator(exit_func):
6869
return decorator
6970

7071

72+
def get_shell_stream(kernel):
73+
# Return the zmq stream that receives messages for the main shell.
74+
if kernel._supports_kernel_subshells:
75+
manager = kernel.shell_channel_thread.manager
76+
socket_pair = manager.get_shell_channel_to_subshell_pair(None)
77+
return socket_pair.to_stream
78+
return kernel.shell_stream
79+
80+
7181
def _notify_stream_qt(kernel):
7282
import operator
7383
from functools import lru_cache
@@ -87,17 +97,20 @@ def exit_loop():
8797
kernel._qt_notifier.setEnabled(False)
8898
kernel.app.qt_event_loop.quit()
8999

90-
def process_stream_events():
100+
def process_stream_events_wrap(shell_stream, *args, **kwargs):
91101
"""fall back to main loop when there's a socket event"""
92102
# call flush to ensure that the stream doesn't lose events
93103
# due to our consuming of the edge-triggered FD
94104
# flush returns the number of events consumed.
95105
# if there were any, wake it up
96-
if kernel.shell_stream.flush(limit=1):
106+
if shell_stream.flush(limit=1):
97107
exit_loop()
98108

109+
shell_stream = get_shell_stream(kernel)
110+
process_stream_events = partial(process_stream_events_wrap, shell_stream)
111+
99112
if not hasattr(kernel, "_qt_notifier"):
100-
fd = kernel.shell_stream.getsockopt(zmq.FD)
113+
fd = shell_stream.getsockopt(zmq.FD)
101114
kernel._qt_notifier = QtCore.QSocketNotifier(
102115
fd, enum_helper("QtCore.QSocketNotifier.Type").Read, kernel.app.qt_event_loop
103116
)
@@ -177,9 +190,11 @@ def loop_wx(kernel):
177190
# Wx uses milliseconds
178191
poll_interval = int(1000 * kernel._poll_interval)
179192

180-
def wake():
193+
shell_stream = get_shell_stream(kernel)
194+
195+
def wake(shell_stream):
181196
"""wake from wx"""
182-
if kernel.shell_stream.flush(limit=1):
197+
if shell_stream.flush(limit=1):
183198
kernel.app.ExitMainLoop()
184199
return
185200

@@ -201,7 +216,7 @@ def on_timer(self, event):
201216
# wx.Timer to defer back to the tornado event loop.
202217
class IPWxApp(wx.App): # type:ignore[misc]
203218
def OnInit(self):
204-
self.frame = TimerFrame(wake)
219+
self.frame = TimerFrame(partial(wake, shell_stream))
205220
self.frame.Show(False)
206221
return True
207222

@@ -248,14 +263,14 @@ def __init__(self, app):
248263

249264
def exit_loop():
250265
"""fall back to main loop"""
251-
app.tk.deletefilehandler(kernel.shell_stream.getsockopt(zmq.FD))
266+
app.tk.deletefilehandler(shell_stream.getsockopt(zmq.FD))
252267
app.quit()
253268
app.destroy()
254269
del kernel.app_wrapper
255270

256-
def process_stream_events(*a, **kw):
271+
def process_stream_events_wrap(shell_stream, *a, **kw):
257272
"""fall back to main loop when there's a socket event"""
258-
if kernel.shell_stream.flush(limit=1):
273+
if shell_stream.flush(limit=1):
259274
exit_loop()
260275

261276
# allow for scheduling exits from the loop in case a timeout needs to
@@ -268,9 +283,10 @@ def _schedule_exit(delay):
268283

269284
# For Tkinter, we create a Tk object and call its withdraw method.
270285
kernel.app_wrapper = BasicAppWrapper(app)
271-
app.tk.createfilehandler(
272-
kernel.shell_stream.getsockopt(zmq.FD), READABLE, process_stream_events
273-
)
286+
shell_stream = get_shell_stream(kernel)
287+
process_stream_events = partial(process_stream_events_wrap, shell_stream)
288+
289+
app.tk.createfilehandler(shell_stream.getsockopt(zmq.FD), READABLE, process_stream_events)
274290
# schedule initial call after start
275291
app.after(0, process_stream_events)
276292

@@ -283,15 +299,19 @@ def _schedule_exit(delay):
283299

284300
nest_asyncio.apply()
285301

286-
doi = kernel.do_one_iteration
287302
# Tk uses milliseconds
288303
poll_interval = int(1000 * kernel._poll_interval)
289304

305+
shell_stream = get_shell_stream(kernel)
306+
290307
class TimedAppWrapper:
291-
def __init__(self, app, func):
308+
def __init__(self, app, shell_stream):
292309
self.app = app
310+
self.shell_stream = shell_stream
293311
self.app.withdraw()
294-
self.func = func
312+
313+
async def func(self):
314+
self.shell_stream.flush(limit=1)
295315

296316
def on_timer(self):
297317
loop = asyncio.get_event_loop()
@@ -305,16 +325,18 @@ def start(self):
305325
self.on_timer() # Call it once to get things going.
306326
self.app.mainloop()
307327

308-
kernel.app_wrapper = TimedAppWrapper(app, doi)
328+
kernel.app_wrapper = TimedAppWrapper(app, shell_stream)
309329
kernel.app_wrapper.start()
310330

311331

312332
@loop_tk.exit
313333
def loop_tk_exit(kernel):
314334
"""Exit the tk loop."""
315335
try:
336+
kernel.app_wrapper.app.quit()
316337
kernel.app_wrapper.app.destroy()
317338
del kernel.app_wrapper
339+
kernel.eventloop = None
318340
except (RuntimeError, AttributeError):
319341
pass
320342

@@ -359,6 +381,7 @@ def loop_cocoa(kernel):
359381
from ._eventloop_macos import mainloop, stop
360382

361383
real_excepthook = sys.excepthook
384+
shell_stream = get_shell_stream(kernel)
362385

363386
def handle_int(etype, value, tb):
364387
"""don't let KeyboardInterrupts look like crashes"""
@@ -377,7 +400,7 @@ def handle_int(etype, value, tb):
377400
# don't let interrupts during mainloop invoke crash_handler:
378401
sys.excepthook = handle_int
379402
mainloop(kernel._poll_interval)
380-
if kernel.shell_stream.flush(limit=1):
403+
if shell_stream.flush(limit=1):
381404
# events to process, return control to kernel
382405
return
383406
except BaseException:
@@ -415,13 +438,14 @@ def loop_asyncio(kernel):
415438
loop._should_close = False # type:ignore[attr-defined]
416439

417440
# pause eventloop when there's an event on a zmq socket
418-
def process_stream_events(stream):
441+
def process_stream_events(shell_stream):
419442
"""fall back to main loop when there's a socket event"""
420-
if stream.flush(limit=1):
443+
if shell_stream.flush(limit=1):
421444
loop.stop()
422445

423-
notifier = partial(process_stream_events, kernel.shell_stream)
424-
loop.add_reader(kernel.shell_stream.getsockopt(zmq.FD), notifier)
446+
shell_stream = get_shell_stream(kernel)
447+
notifier = partial(process_stream_events, shell_stream)
448+
loop.add_reader(shell_stream.getsockopt(zmq.FD), notifier)
425449
loop.call_soon(notifier)
426450

427451
while True:

ipykernel/kernelbase.py

Lines changed: 4 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import asyncio
88
import inspect
9-
import itertools
109
import logging
1110
import os
1211
import socket
@@ -42,7 +41,6 @@
4241
from IPython.core.error import StdinNotImplementedError
4342
from jupyter_client.session import Session
4443
from tornado import ioloop
45-
from tornado.queues import Queue, QueueEmpty
4644
from traitlets.config.configurable import SingletonConfigurable
4745
from traitlets.traitlets import (
4846
Any,
@@ -511,11 +509,6 @@ async def advance_eventloop():
511509
if self.eventloop is not eventloop:
512510
self.log.info("exiting eventloop %s", eventloop)
513511
return
514-
if self.msg_queue.qsize():
515-
self.log.debug("Delaying eventloop due to waiting messages")
516-
# still messages to process, make the eventloop wait
517-
schedule_next()
518-
return
519512
self.log.debug("Advancing eventloop %s", eventloop)
520513
try:
521514
eventloop(self)
@@ -534,98 +527,18 @@ def schedule_next():
534527
# already consumed from the queue by process_one and the queue is
535528
# technically empty.
536529
self.log.debug("Scheduling eventloop advance")
537-
self.io_loop.call_later(0.001, partial(self.schedule_dispatch, advance_eventloop))
530+
self.io_loop.call_later(0.001, advance_eventloop)
538531

539532
# begin polling the eventloop
540533
schedule_next()
541534

542-
async def do_one_iteration(self):
543-
"""Process a single shell message
544-
545-
Any pending control messages will be flushed as well
546-
547-
.. versionchanged:: 5
548-
This is now a coroutine
549-
"""
550-
# flush messages off of shell stream into the message queue
551-
if self.shell_stream and not self._supports_kernel_subshells:
552-
self.shell_stream.flush()
553-
# process at most one shell message per iteration
554-
await self.process_one(wait=False)
555-
556-
async def process_one(self, wait=True):
557-
"""Process one request
558-
559-
Returns None if no message was handled.
560-
"""
561-
if wait:
562-
t, dispatch, args = await self.msg_queue.get()
563-
else:
564-
try:
565-
t, dispatch, args = self.msg_queue.get_nowait()
566-
except (asyncio.QueueEmpty, QueueEmpty):
567-
return
568-
569-
if self.control_thread is None and self.control_stream is not None:
570-
# If there isn't a separate control thread then this main thread handles both shell
571-
# and control messages. Before processing a shell message we need to flush all control
572-
# messages and allow them all to be processed.
573-
await asyncio.sleep(0)
574-
self.control_stream.flush()
575-
576-
socket = self.control_stream.socket
577-
while socket.poll(1):
578-
await asyncio.sleep(0)
579-
self.control_stream.flush()
580-
581-
await dispatch(*args)
582-
583-
async def dispatch_queue(self):
584-
"""Coroutine to preserve order of message handling
585-
586-
Ensures that only one message is processing at a time,
587-
even when the handler is async
588-
"""
589-
590-
while True:
591-
try:
592-
await self.process_one()
593-
except Exception:
594-
self.log.exception("Error in message handler")
595-
596-
_message_counter = Any(
597-
help="""Monotonic counter of messages
598-
""",
599-
)
600-
601-
@default("_message_counter")
602-
def _message_counter_default(self):
603-
return itertools.count()
604-
605-
def schedule_dispatch(self, dispatch, *args):
606-
"""schedule a message for dispatch"""
607-
idx = next(self._message_counter)
608-
609-
self.msg_queue.put_nowait(
610-
(
611-
idx,
612-
dispatch,
613-
args,
614-
)
615-
)
616-
# ensure the eventloop wakes up
617-
self.io_loop.add_callback(lambda: None)
618-
619535
async def _create_control_lock(self):
620536
# This can be removed when minimum python increases to 3.10
621537
self._control_lock = asyncio.Lock()
622538

623539
def start(self):
624540
"""register dispatchers for streams"""
625541
self.io_loop = ioloop.IOLoop.current()
626-
self.msg_queue: Queue[t.Any] = Queue()
627-
if not self.shell_channel_thread:
628-
self.io_loop.add_callback(self.dispatch_queue)
629542

630543
if self.control_stream:
631544
self.control_stream.on_recv(self.dispatch_control, copy=False)
@@ -644,10 +557,7 @@ def start(self):
644557
self.shell_stream.on_recv(self.shell_channel_thread_main, copy=False)
645558
else:
646559
self.shell_stream.on_recv(
647-
partial(
648-
self.schedule_dispatch,
649-
self.dispatch_shell,
650-
),
560+
partial(self.shell_main, None),
651561
copy=False,
652562
)
653563

@@ -693,7 +603,6 @@ async def shell_main(self, subshell_id: str | None, msg):
693603
)
694604
else:
695605
assert subshell_id is None
696-
assert threading.current_thread() == self.shell_channel_thread.parent_thread
697606
asyncio_lock = self._main_asyncio_lock
698607

699608
# Whilst executing a shell message, do not accept any other shell messages on the
@@ -1410,23 +1319,18 @@ async def stop_aborting():
14101319
self.log.info("Finishing abort")
14111320
self._aborting = False
14121321

1413-
# put the stop-aborting event on the message queue
1414-
# so that all messages already waiting in the queue are aborted
1415-
# before we reset the flag
1416-
schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting)
1417-
14181322
if self.stop_on_error_timeout:
14191323
# if we have a delay, give messages this long to arrive on the queue
14201324
# before we stop aborting requests
1421-
self.io_loop.call_later(self.stop_on_error_timeout, schedule_stop_aborting)
1325+
self.io_loop.call_later(self.stop_on_error_timeout, stop_aborting)
14221326
# If we have an eventloop, it may interfere with the call_later above.
14231327
# If the loop has a _schedule_exit method, we call that so the loop exits
14241328
# after stop_on_error_timeout, returning to the main io_loop and letting
14251329
# the call_later fire.
14261330
if self.eventloop is not None and hasattr(self.eventloop, "_schedule_exit"):
14271331
self.eventloop._schedule_exit(self.stop_on_error_timeout + 0.01)
14281332
else:
1429-
schedule_stop_aborting()
1333+
self.io_loop.add_callback(stop_aborting)
14301334

14311335
def _send_abort_reply(self, stream, msg, idents):
14321336
"""Send a reply to an aborted request"""

0 commit comments

Comments
 (0)