15
15
from trio ._util import final
16
16
17
17
18
+ class SuppressDecorator (contextlib .ContextDecorator , contextlib .suppress ):
19
+ pass
20
+
21
+
18
22
@final
19
23
class TrioInteractiveConsole (InteractiveConsole ):
20
24
# code.InteractiveInterpreter defines locals as Mapping[str, Any]
21
25
# but when we pass this to FunctionType it expects a dict. So
22
26
# we make the type more specific on our subclass
23
27
locals : dict [str , object ]
24
28
25
- def __init__ (self , repl_locals : dict [str , object ] | None = None ) -> None :
29
+ def __init__ (
30
+ self , repl_locals : dict [str , object ], trio_token : trio .lowlevel .TrioToken
31
+ ) -> None :
26
32
super ().__init__ (locals = repl_locals )
33
+ self .trio_token = trio_token
27
34
self .compile .compiler .flags |= ast .PyCF_ALLOW_TOP_LEVEL_AWAIT
28
35
29
36
def runcode (self , code : types .CodeType ) -> None :
@@ -70,17 +77,24 @@ def raw_input(self, prompt: str = "") -> str:
70
77
else :
71
78
72
79
def raw_input (self , prompt : str = "" ) -> str :
73
- import fcntl
74
- import termios
75
80
from signal import SIGINT , signal
76
81
77
82
interrupted = False
78
83
84
+ @SuppressDecorator (KeyboardInterrupt )
85
+ @trio .lowlevel .disable_ki_protection
86
+ def newline ():
87
+ import fcntl
88
+ import termios
89
+
90
+ # Fake up a newline char as if user had typed it at
91
+ # os.write(sys.stdin.buffer.fileno(), b"\n")
92
+ fcntl .ioctl (sys .stdin , termios .TIOCSTI , b"\n " )
93
+
79
94
def handler (sig : int , frame : types .FrameType | None ) -> None :
80
95
nonlocal interrupted
81
96
interrupted = True
82
- # Fake up a newline char as if user had typed it at terminal
83
- fcntl .ioctl (sys .stdin , termios .TIOCSTI , b"\n " )
97
+ self .trio_token .run_sync_soon (newline , idempotent = True )
84
98
85
99
prev_handler = trio .from_thread .run_sync (signal , SIGINT , handler )
86
100
try :
@@ -91,14 +105,16 @@ def handler(sig: int, frame: types.FrameType | None) -> None:
91
105
raise KeyboardInterrupt
92
106
93
107
94
- async def run_repl (console : TrioInteractiveConsole ) -> None :
108
+ async def run_repl (repl_locals : dict [ str , object ] ) -> None :
95
109
banner = (
96
110
f"trio REPL { sys .version } on { sys .platform } \n "
97
111
f'Use "await" directly instead of "trio.run()".\n '
98
112
f'Type "help", "copyright", "credits" or "license" '
99
113
f"for more information.\n "
100
114
f'{ getattr (sys , "ps1" , ">>> " )} import trio'
101
115
)
116
+ token = trio .lowlevel .current_trio_token ()
117
+ console = TrioInteractiveConsole (repl_locals , token )
102
118
try :
103
119
await trio .to_thread .run_sync (console .interact , banner )
104
120
finally :
@@ -124,5 +140,4 @@ def main(original_locals: dict[str, object]) -> None:
124
140
}:
125
141
repl_locals [key ] = original_locals [key ]
126
142
127
- console = TrioInteractiveConsole (repl_locals )
128
- trio .run (run_repl , console )
143
+ trio .run (run_repl , repl_locals )
0 commit comments