Skip to content

Commit f2020ed

Browse files
committed
Add flow control in AsyncSSH redirection to StreamWriter objects
This commit adds support for flow control when an asyncio.StreamWriter is passed to AsyncSSH's process redirect feature. Previously, writes were allowed indefinitely, potentially building up a large buffer in memory if data was being sent faster than it was being consumed. Thanks go to Benjy Wiener for reporting this issue and supplying a test script to reproduce it.
1 parent c3dc869 commit f2020ed

File tree

2 files changed

+56
-5
lines changed

2 files changed

+56
-5
lines changed

asyncssh/process.py

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -571,24 +571,52 @@ def close(self) -> None:
571571
class _StreamWriter(_UnicodeWriter[AnyStr]):
572572
"""Forward data to an asyncio stream"""
573573

574-
def __init__(self, writer: asyncio.StreamWriter,
574+
def __init__(self, process: 'SSHProcess[AnyStr]',
575+
writer: asyncio.StreamWriter, recv_eof: bool,
575576
encoding: Optional[str], errors: str):
576577
super().__init__(encoding, errors)
577578

579+
self._process: 'SSHProcess[AnyStr]' = process
578580
self._writer = writer
581+
self._recv_eof = recv_eof
582+
self._queue: asyncio.Queue[Optional[AnyStr]] = asyncio.Queue()
583+
self._write_task: Optional[asyncio.Task[None]] = \
584+
process.channel.get_connection().create_task(self._feed())
585+
586+
async def _feed(self) -> None:
587+
"""Feed data to the stream"""
588+
589+
while True:
590+
data = await self._queue.get()
591+
592+
if data is None:
593+
self._queue.task_done()
594+
break
595+
596+
self._writer.write(self.encode(data))
597+
await self._writer.drain()
598+
self._queue.task_done()
599+
600+
if self._recv_eof:
601+
self._writer.write_eof()
579602

580603
def write(self, data: AnyStr) -> None:
581604
"""Write data to the stream"""
582605

583-
self._writer.write(self.encode(data))
606+
self._queue.put_nowait(data)
584607

585608
def write_eof(self) -> None:
586609
"""Write EOF to the stream"""
587610

588-
self._writer.write_eof()
611+
self.close()
589612

590613
def close(self) -> None:
591-
"""Ignore close -- the caller must clean up the associated transport"""
614+
"""Stop forwarding data to the stream"""
615+
616+
if self._write_task:
617+
self._write_task = None
618+
self._queue.put_nowait(None)
619+
self._process.add_cleanup_task(self._queue.join())
592620

593621

594622
class _DevNullWriter(_WriterProtocol[AnyStr]):
@@ -925,7 +953,8 @@ def pipe_factory() -> _PipeWriter:
925953
writer_process.set_reader(reader, send_eof, writer_datatype)
926954
writer = _ProcessWriter[AnyStr](writer_process, writer_datatype)
927955
elif isinstance(target, asyncio.StreamWriter):
928-
writer = _StreamWriter(target, self._encoding, self._errors)
956+
writer = _StreamWriter(self, target, recv_eof,
957+
self._encoding, self._errors)
929958
else:
930959
file: _File
931960
needs_close = True

tests/test_process.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -961,6 +961,28 @@ async def test_stdout_stream(self):
961961

962962
self.assertEqual(stdout_data, data.encode('ascii'))
963963

964+
@unittest.skipIf(sys.platform == 'win32',
965+
'skip asyncio.subprocess tests on Windows')
966+
@asynctest
967+
async def test_stdout_stream_keep_open(self):
968+
"""Test with stdout redirected to asyncio stream which remains open"""
969+
970+
data = str(id(self))
971+
972+
async with self.connect() as conn:
973+
proc2 = await asyncio.create_subprocess_shell(
974+
'cat', stdin=asyncio.subprocess.PIPE,
975+
stdout=asyncio.subprocess.PIPE)
976+
977+
await conn.run('echo', input=data, stdout=proc2.stdin,
978+
stderr=asyncssh.DEVNULL, recv_eof=False)
979+
await conn.run('echo', input=data, stdout=proc2.stdin,
980+
stderr=asyncssh.DEVNULL)
981+
982+
stdout_data = await proc2.stdout.read()
983+
984+
self.assertEqual(stdout_data, 2*data.encode('ascii'))
985+
964986
@asynctest
965987
async def test_change_stdout(self):
966988
"""Test changing stdout of an open process"""

0 commit comments

Comments
 (0)