Skip to content

Commit 3b1d052

Browse files
authored
Buffered rw (#241)
* Added concurrent RW buffer class and tests * Migrated ssh client to concurrent rw buffer. Added iteration support to reader. Updated ssh client line parsing. * Added partial read timeout with and without join test and for updated API calls. * Updated host output to make stdout/stderr dynamic properties. * Deprecated reset generators function - now a no-op. * Added read timeout and encoding attributes to host output. * Updated changelog, documentation.
1 parent 519d89c commit 3b1d052

File tree

13 files changed

+485
-272
lines changed

13 files changed

+485
-272
lines changed

Changelog.rst

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,49 @@
11
Change Log
22
============
33

4+
2.3.0
5+
+++++
6+
7+
Changes
8+
-------
9+
10+
* ``SSHClient`` now starts buffering output from remote host, both standard output and standard error, when a command is run.
11+
* ``SSHClient.read_output``, ``SSHClient.read_stderr`` and iterating on stdout/stderr from ``HostOutput`` now read from the internal buffer rather than the SSH channel directly.
12+
* ``ParallelSSHClient.join`` no longer requires ``consume_output`` to be set in order to get exit codes without first reading output.
13+
* ``ParallelSSHClient.join`` with timeout no longer consumes output by default. It is now possible to use ``join`` with a timeout and capture output after ``join`` completes.
14+
* ``ParallelSSHClient.reset_output_generators`` is now a no-op and no longer required to be called after timeouts.
15+
* ``HostOutput.stdout`` and ``stderr`` are now dynamic properties.
16+
* Added ``HostOutput.read_timeout`` attribute. Can be used to see what read timeout was when ``run_command`` was called and to change timeout when next reading from ``HostOutput.stdout`` and ``stderr``.
17+
* Added ``HostOutput.encoding`` attribute for encoding used when ``run_command`` was called. Encoding can now be changed for when next reading output.
18+
* ``ParallelSSHClient.join`` with timeout no longer affects ``stdout`` or ``stderr`` read timeout set when ``run_command`` was called.
19+
* LibSSH clients under ``pssh.clients.ssh`` now allow output to be read as it becomes available without waiting for remote command to finish first.
20+
* Reading from output behaviour is now consistent across all client types - parallel and single clients under both ``pssh.clients.native`` and ``pssh.clients.ssh``.
21+
* ``ParallelSSHClient.join`` can now be called without arguments and defaults to last ran commands.
22+
* ``ParallelSSHClient.finished`` can now be called without arguments and defaults to last ran commands.
23+
24+
25+
This is now possible:
26+
27+
.. code-block:: python
28+
29+
output = client.run_command(<..>)
30+
client.join(output)
31+
assert output[0].exit_code is not None
32+
33+
As is this:
34+
35+
.. code-block:: python
36+
37+
client.run_command(<..>, timeout=1)
38+
client.join(output, timeout=1)
39+
for line in output[0].stdout:
40+
print(line)
41+
42+
Output can be read after and has separate timeout from join.
43+
44+
See `documentation for more examples on use of timeouts <https://parallel-ssh.readthedocs.io/en/latest/advanced.html#partial-output>`_.
45+
46+
447
2.2.0
548
+++++
649

doc/advanced.rst

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,9 @@ See :py:mod:`HostConfig <pssh.config.HostConfig>` for all possible configuration
188188
Join and Output Timeouts
189189
**************************
190190

191-
Clients have timeout functionality on reading output and ``client.join``. Join timeout is a timeout on all parallel commands in total and is separate from ``ParallelSSHClient(timeout=<..>)`` which is applied to SSH session operations individually.
191+
Clients have timeout functionality on reading output and ``client.join``.
192+
193+
Join timeout is applied to all parallel commands in total and is separate from ``ParallelSSHClient(timeout=<..>)`` which is applied to SSH session operations individually.
192194

193195
Timeout exceptions contain attributes for which commands have finished and which have not so client code can get output from any finished commands when handling timeouts.
194196

@@ -207,18 +209,18 @@ The client will raise a ``Timeout`` exception if *all* remote commands have not
207209

208210
.. code-block:: python
209211
210-
output = client.run_command(.., timeout=5)
212+
output = client.run_command(.., read_timeout=5)
211213
for host_out in output:
212214
try:
213215
for line in host_out.stdout:
214-
pass
216+
print(line)
215217
for line in host_out.stderr:
216-
pass
218+
print(line)
217219
except Timeout:
218220
pass
219221
220222
221-
In the case of reading from output such as in the example above, timeout value is per output stream - meaning separate timeouts for stdout and stderr as well as separate timeout per host remote command.
223+
In the case of reading from output such as in the example above, timeout value is per output stream - meaning separate timeouts for stdout and stderr as well as separate timeout per host output.
222224

223225
*New in 1.5.0*
224226

@@ -253,6 +255,7 @@ In the above example, output is printed only for those commands which have compl
253255

254256
Client code may choose to then join again only on the unfinished output if some commands have failed in order to gather remaining output.
255257

258+
.. _partial-output:
256259

257260
Reading Partial Output of Commands That Do Not Terminate
258261
==========================================================
@@ -262,17 +265,16 @@ In some cases, such as when the remote command never terminates unless interrupt
262265
.. code-block:: python
263266
264267
output = client.run_command(
265-
'tail -f /var/log/messages', use_pty=True, timeout=1)
268+
'while true; do echo a line; sleep .1; done', use_pty=True, read_timeout=1)
266269
267-
# Read as many lines of output as server has sent before the timeout
270+
# Read as many lines of output as hosts have sent before the timeout
268271
stdout = []
269272
for host_out in output:
270273
try:
271274
for line in host_out.stdout:
272275
stdout.append(line)
273276
except Timeout:
274-
# This allows client code to continue to read output after timeout
275-
client.reset_output_generators(host_out, timeout=1)
277+
pass
276278
277279
# Closing channel which has PTY has the effect of terminating
278280
# any running processes started on that channel.
@@ -281,18 +283,20 @@ In some cases, such as when the remote command never terminates unless interrupt
281283
# Join is not strictly needed here as channel has already been closed and
282284
# command has finished, but is safe to use regardless.
283285
client.join(output)
286+
# Can now read output up to when the channel was closed without blocking.
287+
rest_of_stdout = list(output[0].stdout)
284288
285289
Without a PTY, a ``join`` call with a timeout will complete with timeout exception raised but the remote process will be left running as per SSH protocol specifications.
286290

287-
Furthermore, once reading output has timed out, it is necessary to restart the output generators as by Python design they only iterate once. This is done by ``client.reset_output_generators`` in the above example.
291+
.. note::
288292

289-
Generator reset is also performed automatically by calls to ``join`` and does not need to be done manually when ``join`` is used after output reading.
293+
Read timeout may be changed after ``run_command`` has been called by changing ``HostOutput.read_timeout`` for that particular host output.
290294

291295
.. note::
292296

293-
``join`` with a timeout forces output to be consumed as otherwise the pending output will keep the channel open and make it appear as if command has not yet finished.
297+
When output from commands is not needed, it is best to use ``client.join(consume_output=True)`` so that output buffers are consumed automatically.
294298

295-
To capture output when using ``join`` with a timeout, gather output first before calling ``join``, making use of output timeout as well, and/or make use of :ref:`host logger` functionality.
299+
If output is not read or automatically consumed by ``join`` output buffers will continually grow, resulting in increasing memory consumption while the client is running, though memory use rises very slowly.
296300

297301

298302
Per-Host Configuration
@@ -320,19 +324,7 @@ In the above example, the client is configured to connect to hostname ``localhos
320324

321325
When using ``host_config``, the number of ``HostConfig`` entries must match the number of hosts in ``client.hosts``. An exception is raised on client initialisation if not.
322326

323-
324-
.. note::
325-
326-
Currently only ``port``, ``user``, ``password`` and ``private_key`` ``HostConfig`` values are used.
327-
328-
329-
.. note::
330-
331-
Proxy host configuration is currently per ``ParallelSSHClient`` and cannot yet be provided via per-host configuration.
332-
Multiple clients can be used to make use of multiple proxy hosts.
333-
334-
This feature will be provided in future releases.
335-
327+
As of `2.2.0`, proxy configuration can also be provided in ``HostConfig``.
336328

337329
.. _per-host-cmds:
338330

@@ -586,7 +578,7 @@ For example, to copy the local files ``['local_file_1', 'local_file_2']`` as rem
586578
587579
The client will copy ``local_file_1`` to ``host1`` as ``remote_file_1`` and ``local_file_2`` to ``host2`` as ``remote_file_2``.
588580

589-
Each item in ``copy_args`` list should be a dictionary as shown above. Number of ``copy_args`` must match length of ``client.hosts`` if provided or exception will be raised.
581+
Each item in ``copy_args`` list should be a dictionary as shown above. Number of items in ``copy_args`` must match length of ``client.hosts`` if provided or exception will be raised.
590582

591583
``copy_remote_file``, ``scp_send`` and ``scp_recv`` may all be used in the same manner to configure remote and local file names per host.
592584

@@ -605,6 +597,7 @@ If wanting to copy a file from a single remote host and retain the original file
605597
606598
client = SSHClient('localhost')
607599
client.copy_remote_file('remote_filename', 'local_filename')
600+
client.scp_recv('remote_filename', 'local_filename')
608601
609602
.. seealso::
610603

@@ -653,6 +646,7 @@ Hosts list can be modified in place. A call to ``run_command`` will create new c
653646
654647
client.hosts = ['otherhost']
655648
print(client.run_command('exit 0'))
649+
<..>
656650
host='otherhost'
657651
exit_code=None
658652
<..>

doc/quickstart.rst

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -117,23 +117,16 @@ Standard output, aka ``stdout``, for a given :py:class:`HostOutput <pssh.output.
117117
<line by line output>
118118
<..>
119119
120-
There is nothing special needed to ensure output is available.
121-
122-
Please note that retrieving all of a command's standard output by definition requires that the command has completed.
123-
124-
Iterating over ``stdout`` for any host *to completion* will therefor *only complete* when that host's command has completed unless interrupted.
120+
Iterating over ``stdout`` will only end when the remote command has finished unless interrupted.
125121

126122
The ``timeout`` keyword argument to ``run_command`` may be used to cause output generators to timeout if no output is received after the given number of seconds - see `join and output timeouts <advanced.html#join-and-output-timeouts>`_.
127123

128-
``stdout`` is a generator. Iterating over it will consume the remote standard output stream via the network as it becomes available. To retrieve all of stdout can wrap it with list, per below.
124+
``stdout`` is a generator. To retrieve all of stdout can wrap it with list, per below.
129125

130126
.. code-block:: python
131127
132128
stdout = list(host_out.stdout)
133129
134-
.. warning::
135-
136-
This will store the entirety of stdout into memory.
137130
138131
All hosts iteration
139132
-------------------

pssh/clients/base/parallel.py

Lines changed: 23 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import gevent.pool
2323

24+
from warnings import warn
2425
from gevent import joinall, spawn, Timeout as GTimeout
2526
from gevent.hub import Hub
2627

@@ -123,27 +124,16 @@ def _get_output_from_greenlet(self, cmd, raise_error=False):
123124
ex = Timeout()
124125
if raise_error:
125126
raise ex
126-
return HostOutput(host, None, None, None, None,
127-
None, exception=ex)
127+
return HostOutput(host, None, None, None,
128+
exception=ex)
128129

129130
def get_last_output(self, cmds=None, timeout=None,
130131
return_list=True):
131-
"""Get output for last commands executed by ``run_command``
132+
"""Get output for last commands executed by ``run_command``.
132133
133134
:param cmds: Commands to get output for. Defaults to ``client.cmds``
134135
:type cmds: list(:py:class:`gevent.Greenlet`)
135-
:param greenlet_timeout: (Optional) Greenlet timeout setting.
136-
Defaults to no timeout. If set, this function will raise
137-
:py:class:`gevent.Timeout` after ``greenlet_timeout`` seconds
138-
if no result is available from greenlets.
139-
In some cases, such as when using proxy hosts, connection timeout
140-
is controlled by proxy server and getting result from greenlets may
141-
hang indefinitely if remote server is unavailable. Use this setting
142-
to avoid blocking in such circumstances.
143-
Note that ``gevent.Timeout`` is a special class that inherits from
144-
``BaseException`` and thus **can not be caught** by
145-
``stop_on_errors=False``.
146-
:type greenlet_timeout: float
136+
:param timeout: No-op - to be removed.
147137
:param return_list: No-op - list of ``HostOutput`` always returned.
148138
Parameter kept for backwards compatibility - to be removed in future
149139
releases.
@@ -161,32 +151,9 @@ def get_last_output(self, cmds=None, timeout=None,
161151
def reset_output_generators(self, host_out, timeout=None,
162152
client=None, channel=None,
163153
encoding='utf-8'):
164-
"""Reset output generators for host output.
165-
166-
:param host_out: Host output
167-
:type host_out: :py:class:`pssh.output.HostOutput`
168-
:param client: (Optional) SSH client
169-
:type client: :py:class:`pssh.ssh2_client.SSHClient`
170-
:param channel: (Optional) SSH channel
171-
:type channel: :py:class:`ssh2.channel.Channel`
172-
:param timeout: (Optional) Timeout setting
173-
:type timeout: int
174-
:param encoding: (Optional) Encoding to use for output. Must be valid
175-
`Python codec <https://docs.python.org/library/codecs.html>`_
176-
:type encoding: str
177-
178-
:rtype: tuple(stdout, stderr)
179-
"""
180-
channel = host_out.channel if channel is None else channel
181-
client = host_out.client if client is None else client
182-
stdout = client.read_output_buffer(
183-
client.read_output(channel, timeout=timeout), encoding=encoding)
184-
stderr = client.read_output_buffer(
185-
client.read_stderr(channel, timeout=timeout),
186-
prefix='\t[err]', encoding=encoding)
187-
host_out.stdout = stdout
188-
host_out.stderr = stderr
189-
return stdout, stderr
154+
"""No-op - to be removed."""
155+
warn("This function is a no-op and deprecated. "
156+
"It will be removed in future releases")
190157

191158
def _get_host_config_values(self, host_i, host):
192159
if self.host_config is None:
@@ -251,7 +218,7 @@ def _consume_output(self, stdout, stderr):
251218
for line in stderr:
252219
pass
253220

254-
def join(self, output, consume_output=False, timeout=None,
221+
def join(self, output=None, consume_output=False, timeout=None,
255222
encoding='utf-8'):
256223
"""Wait until all remote commands in output have finished.
257224
Does *not* block other commands from running in parallel.
@@ -264,9 +231,7 @@ def join(self, output, consume_output=False, timeout=None,
264231
output on call to ``join`` when host logger has been enabled.
265232
:type consume_output: bool
266233
:param timeout: Timeout in seconds if **all** remote commands are not
267-
yet finished. Note that use of timeout forces ``consume_output=True``
268-
otherwise the channel output pending to be consumed always results
269-
in the channel not being finished.
234+
yet finished.
270235
This function's timeout is for all commands in total and will therefor
271236
be affected by pool size and total number of concurrent commands in
272237
self.pool.
@@ -281,7 +246,9 @@ def join(self, output, consume_output=False, timeout=None,
281246
reached with commands still running.
282247
283248
:rtype: ``None``"""
284-
if not isinstance(output, list):
249+
if output is None:
250+
output = self.get_last_output()
251+
elif not isinstance(output, list):
285252
raise ValueError("Unexpected output object type")
286253
cmds = [self.pool.spawn(self._join, host_out, timeout=timeout,
287254
consume_output=consume_output, encoding=encoding)
@@ -305,21 +272,24 @@ def _join(self, host_out, consume_output=False, timeout=None,
305272
client = host_out.client
306273
if client is None:
307274
return
308-
stdout, stderr = self.reset_output_generators(
309-
host_out, channel=channel, timeout=timeout,
310-
encoding=encoding)
311275
client.wait_finished(channel, timeout=timeout)
312276
if consume_output:
313-
self._consume_output(stdout, stderr)
277+
self._consume_output(host_out.stdout, host_out.stderr)
314278
return host_out
315279

316-
def finished(self, output):
317-
"""Check if commands have finished without blocking
280+
def finished(self, output=None):
281+
"""Check if commands have finished without blocking.
318282
319283
:param output: As returned by
320-
:py:func:`pssh.pssh_client.ParallelSSHClient.get_output`
284+
:py:func:`pssh.pssh_client.ParallelSSHClient.get_last_output`
285+
:type output: list
286+
321287
:rtype: bool
322288
"""
289+
if output is None:
290+
output = self.get_last_output()
291+
if output is None:
292+
return True
323293
for host_out in output:
324294
chan = host_out.channel
325295
if host_out.client and not host_out.client.finished(chan):

0 commit comments

Comments
 (0)