@@ -883,7 +883,7 @@ def write(self, data: bytes) -> None:
883
883
expectation , responses = self .expectations .popleft ()
884
884
assert expectation == line , f"expected { expectation } , got: { line } "
885
885
if responses :
886
- self .protocol .pipe_data_received ( 1 , "\n " .join (responses + ["" ]).encode ("utf-8" ))
886
+ self .protocol .loop . call_soon ( self . protocol . pipe_data_received , 1 , "\n " .join (responses + ["" ]).encode ("utf-8" ))
887
887
888
888
def get_pid (self ) -> int :
889
889
return id (self )
@@ -934,12 +934,12 @@ def connection_lost(self, exc: Optional[Exception]) -> None:
934
934
LOGGER .debug ("%s: Connection lost (exit code: %d, error: %s)" , self , code , exc )
935
935
936
936
# Terminate commands.
937
- if self .command is not None :
938
- self .command . _engine_terminated ( code )
939
- self . command = None
940
- if self . next_command is not None :
941
- self . next_command . _engine_terminated ( code )
942
- self . next_command = None
937
+ command , self .command = self . command , None
938
+ next_command , self . next_command = self .next_command , None
939
+ if command :
940
+ command . _engine_terminated ( code )
941
+ if next_command :
942
+ next_command . _engine_terminated ( code )
943
943
944
944
self .returncode .set_result (code )
945
945
@@ -965,9 +965,9 @@ def pipe_data_received(self, fd: int, data: Union[bytes, str]) -> None:
965
965
LOGGER .warning ("%s: >> %r (%s)" , self , bytes (line_bytes ), err )
966
966
else :
967
967
if fd == 1 :
968
- self .loop . call_soon ( self . _line_received , line )
968
+ self ._line_received ( line )
969
969
else :
970
- self .loop . call_soon ( self . error_line_received , line )
970
+ self .error_line_received ( line )
971
971
972
972
def error_line_received (self , line : str ) -> None :
973
973
LOGGER .warning ("%s: stderr >> %s" , self , line )
@@ -998,7 +998,7 @@ async def communicate(self, command_factory: Callable[[Self], BaseCommand[T]]) -
998
998
999
999
self .next_command = command
1000
1000
1001
- def previous_command_finished (_ : Optional [ asyncio . Future [ None ]] ) -> None :
1001
+ def previous_command_finished () -> None :
1002
1002
self .command , self .next_command = self .next_command , None
1003
1003
if self .command is not None :
1004
1004
cmd = self .command
@@ -1008,11 +1008,11 @@ def cancel_if_cancelled(result: asyncio.Future[T]) -> None:
1008
1008
cmd ._cancel ()
1009
1009
1010
1010
cmd .result .add_done_callback (cancel_if_cancelled )
1011
- cmd .finished .add_done_callback (previous_command_finished )
1012
1011
cmd ._start ()
1012
+ cmd .add_finished_callback (previous_command_finished )
1013
1013
1014
1014
if self .command is None :
1015
- previous_command_finished (None )
1015
+ previous_command_finished ()
1016
1016
elif not self .command .result .done ():
1017
1017
self .command .result .cancel ()
1018
1018
elif not self .command .result .cancelled ():
@@ -1228,13 +1228,25 @@ def __init__(self, engine: Protocol) -> None:
1228
1228
self .result : asyncio .Future [T ] = asyncio .Future ()
1229
1229
self .finished : asyncio .Future [None ] = asyncio .Future ()
1230
1230
1231
+ self ._finished_callbacks : List [Callable [[], None ]] = []
1232
+
1233
+ def add_finished_callback (self , callback : Callable [[], None ]) -> None :
1234
+ self ._finished_callbacks .append (callback )
1235
+ self ._dispatch_finished ()
1236
+
1237
+ def _dispatch_finished (self ) -> None :
1238
+ if self .finished .done ():
1239
+ while self ._finished_callbacks :
1240
+ self ._finished_callbacks .pop ()()
1241
+
1231
1242
def _engine_terminated (self , code : int ) -> None :
1232
1243
hint = ", binary not compatible with cpu?" if code in [- 4 , 0xc000001d ] else ""
1233
1244
exc = EngineTerminatedError (f"engine process died unexpectedly (exit code: { code } { hint } )" )
1234
1245
if self .state == CommandState .ACTIVE :
1235
1246
self .engine_terminated (exc )
1236
1247
elif self .state == CommandState .CANCELLING :
1237
1248
self .finished .set_result (None )
1249
+ self ._dispatch_finished ()
1238
1250
elif self .state == CommandState .NEW :
1239
1251
self ._handle_exception (exc )
1240
1252
@@ -1251,13 +1263,15 @@ def _handle_exception(self, exc: Exception) -> None:
1251
1263
1252
1264
if not self .finished .done ():
1253
1265
self .finished .set_result (None )
1266
+ self ._dispatch_finished ()
1254
1267
1255
1268
def set_finished (self ) -> None :
1256
1269
assert self .state in [CommandState .ACTIVE , CommandState .CANCELLING ], self .state
1257
1270
if not self .result .done ():
1258
1271
self .result .set_exception (EngineError (f"engine command finished before returning result: { self !r} " ))
1259
- self .finished .set_result (None )
1260
1272
self .state = CommandState .DONE
1273
+ self .finished .set_result (None )
1274
+ self ._dispatch_finished ()
1261
1275
1262
1276
def _cancel (self ) -> None :
1263
1277
if self .state != CommandState .CANCELLING and self .state != CommandState .DONE :
0 commit comments