Skip to content

Commit 95756fa

Browse files
committed
Change default on SFTP max_requests to avoid excessive memory usage
This commit changes the way the default number of parallel SFTP requests is determined. Instead of always defaulting to 128, lower values are used for large block sizes, to reduce memory usage. With larger block sizes, there's no need for as many parallel requests to keep the pipe full. The minimum default is now 16, for block sizes of 256 KB or more. The maximum default is 128, for block sizes of 32 KB or below.
1 parent ebd5f33 commit 95756fa

File tree

2 files changed

+64
-13
lines changed

2 files changed

+64
-13
lines changed

asyncssh/sftp.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3165,14 +3165,22 @@ def __init__(self, handler: SFTPClientHandler, handle: bytes,
31653165
self._appending = appending
31663166
self._encoding = encoding
31673167
self._errors = errors
3168-
self._max_requests = max_requests
31693168
self._offset = None if appending else 0
31703169

31713170
self.read_len = \
31723171
handler.limits.max_read_len if block_size == -1 else block_size
31733172
self.write_len = \
31743173
handler.limits.max_write_len if block_size == -1 else block_size
31753174

3175+
if max_requests <= 0:
3176+
if self.read_len:
3177+
max_requests = max(16, min(MAX_SFTP_READ_LEN //
3178+
self.read_len, 128))
3179+
else:
3180+
max_requests = 1
3181+
3182+
self._max_requests = max_requests
3183+
31763184
async def __aenter__(self) -> Self:
31773185
"""Allow SFTPClientFile to be used as an async context manager"""
31783186

@@ -3859,6 +3867,9 @@ async def _begin_copy(self, srcfs: _SFTPFSProtocol, dstfs: _SFTPFSProtocol,
38593867
block_size = min(srcfs.limits.max_read_len,
38603868
dstfs.limits.max_write_len)
38613869

3870+
if max_requests <= 0:
3871+
max_requests = max(16, min(MAX_SFTP_READ_LEN // block_size, 128))
3872+
38623873
if isinstance(srcpaths, (bytes, str, PurePath)):
38633874
srcpaths = [srcpaths]
38643875
elif not isinstance(srcpaths, list):
@@ -3916,7 +3927,7 @@ async def get(self, remotepaths: _SFTPPaths,
39163927
localpath: Optional[_SFTPPath] = None, *,
39173928
preserve: bool = False, recurse: bool = False,
39183929
follow_symlinks: bool = False, block_size: int = -1,
3919-
max_requests: int = _MAX_SFTP_REQUESTS,
3930+
max_requests: int = -1,
39203931
progress_handler: SFTPProgressHandler = None,
39213932
error_handler: SFTPErrorHandler = None) -> None:
39223933
"""Download remote files
@@ -3957,7 +3968,9 @@ async def get(self, remotepaths: _SFTPPaths,
39573968
doesn't advertise limits.
39583969
39593970
The max_requests argument specifies the maximum number of
3960-
parallel read or write requests issued, defaulting to 128.
3971+
parallel read or write requests issued, defaulting to a
3972+
value between 16 and 128 depending on the selected block
3973+
size to avoid excessive memory usage.
39613974
39623975
If progress_handler is specified, it will be called after
39633976
each block of a file is successfully downloaded. The arguments
@@ -4022,7 +4035,7 @@ async def put(self, localpaths: _SFTPPaths,
40224035
remotepath: Optional[_SFTPPath] = None, *,
40234036
preserve: bool = False, recurse: bool = False,
40244037
follow_symlinks: bool = False, block_size: int = -1,
4025-
max_requests: int = _MAX_SFTP_REQUESTS,
4038+
max_requests: int = -1,
40264039
progress_handler: SFTPProgressHandler = None,
40274040
error_handler: SFTPErrorHandler = None) -> None:
40284041
"""Upload local files
@@ -4063,7 +4076,9 @@ async def put(self, localpaths: _SFTPPaths,
40634076
doesn't advertise limits.
40644077
40654078
The max_requests argument specifies the maximum number of
4066-
parallel read or write requests issued, defaulting to 128.
4079+
parallel read or write requests issued, defaulting to a
4080+
value between 16 and 128 depending on the selected block
4081+
size to avoid excessive memory usage.
40674082
40684083
If progress_handler is specified, it will be called after
40694084
each block of a file is successfully uploaded. The arguments
@@ -4128,7 +4143,7 @@ async def copy(self, srcpaths: _SFTPPaths,
41284143
dstpath: Optional[_SFTPPath] = None, *,
41294144
preserve: bool = False, recurse: bool = False,
41304145
follow_symlinks: bool = False, block_size: int = -1,
4131-
max_requests: int = _MAX_SFTP_REQUESTS,
4146+
max_requests: int = -1,
41324147
progress_handler: SFTPProgressHandler = None,
41334148
error_handler: SFTPErrorHandler = None,
41344149
remote_only: bool = False) -> None:
@@ -4170,7 +4185,9 @@ async def copy(self, srcpaths: _SFTPPaths,
41704185
doesn't advertise limits.
41714186
41724187
The max_requests argument specifies the maximum number of
4173-
parallel read or write requests issued, defaulting to 128.
4188+
parallel read or write requests issued, defaulting to a
4189+
value between 16 and 128 depending on the selected block
4190+
size to avoid excessive memory usage.
41744191
41754192
If progress_handler is specified, it will be called after
41764193
each block of a file is successfully copied. The arguments
@@ -4238,7 +4255,7 @@ async def mget(self, remotepaths: _SFTPPaths,
42384255
localpath: Optional[_SFTPPath] = None, *,
42394256
preserve: bool = False, recurse: bool = False,
42404257
follow_symlinks: bool = False, block_size: int = -1,
4241-
max_requests: int = _MAX_SFTP_REQUESTS,
4258+
max_requests: int = -1,
42424259
progress_handler: SFTPProgressHandler = None,
42434260
error_handler: SFTPErrorHandler = None) -> None:
42444261
"""Download remote files with glob pattern match
@@ -4261,7 +4278,7 @@ async def mput(self, localpaths: _SFTPPaths,
42614278
remotepath: Optional[_SFTPPath] = None, *,
42624279
preserve: bool = False, recurse: bool = False,
42634280
follow_symlinks: bool = False, block_size: int = -1,
4264-
max_requests: int = _MAX_SFTP_REQUESTS,
4281+
max_requests: int = -1,
42654282
progress_handler: SFTPProgressHandler = None,
42664283
error_handler: SFTPErrorHandler = None) -> None:
42674284
"""Upload local files with glob pattern match
@@ -4284,7 +4301,7 @@ async def mcopy(self, srcpaths: _SFTPPaths,
42844301
dstpath: Optional[_SFTPPath] = None, *,
42854302
preserve: bool = False, recurse: bool = False,
42864303
follow_symlinks: bool = False, block_size: int = -1,
4287-
max_requests: int = _MAX_SFTP_REQUESTS,
4304+
max_requests: int = -1,
42884305
progress_handler: SFTPProgressHandler = None,
42894306
error_handler: SFTPErrorHandler = None,
42904307
remote_only: bool = False) -> None:
@@ -4586,7 +4603,7 @@ async def open(self, path: _SFTPPath,
45864603
attrs: SFTPAttrs = SFTPAttrs(),
45874604
encoding: Optional[str] = 'utf-8', errors: str = 'strict',
45884605
block_size: int = -1,
4589-
max_requests: int = _MAX_SFTP_REQUESTS) -> SFTPClientFile:
4606+
max_requests: int = -1) -> SFTPClientFile:
45904607
"""Open a remote file
45914608
45924609
This method opens a remote file and returns an
@@ -4662,7 +4679,9 @@ async def open(self, path: _SFTPPath,
46624679
default of using the server-advertised limits.
46634680
46644681
The max_requests argument specifies the maximum number of
4665-
parallel read or write requests issued, defaulting to 128.
4682+
parallel read or write requests issued, defaulting to a
4683+
value between 16 and 128 depending on the selected block
4684+
size to avoid excessive memory usage.
46664685
46674686
:param path:
46684687
The name of the remote file to open
@@ -4718,7 +4737,7 @@ async def open56(self, path: _SFTPPath,
47184737
attrs: SFTPAttrs = SFTPAttrs(),
47194738
encoding: Optional[str] = 'utf-8', errors: str = 'strict',
47204739
block_size: int = -1,
4721-
max_requests: int = _MAX_SFTP_REQUESTS) -> SFTPClientFile:
4740+
max_requests: int = -1) -> SFTPClientFile:
47224741
"""Open a remote file using SFTP v5/v6 flags
47234742
47244743
This method is very similar to :meth:`open`, but the pflags_or_mode

tests/test_sftp.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,21 @@ async def test_copy(self, sftp):
748748
finally:
749749
remove('src dst')
750750

751+
@sftp_test
752+
async def test_copy_max_requests(self, sftp):
753+
"""Test copying a file over SFTP with max requests set"""
754+
755+
for method in ('get', 'put', 'copy'):
756+
for src in ('src', b'src', Path('src')):
757+
with self.subTest(method=method, src=type(src)):
758+
try:
759+
self._create_file('src', 16*1024*1024*'\0')
760+
await getattr(sftp, method)(src, 'dst',
761+
max_requests=4)
762+
self._check_file('src', 'dst')
763+
finally:
764+
remove('src dst')
765+
751766
def test_copy_non_remote(self):
752767
"""Test copying without using remote_copy function"""
753768

@@ -2305,6 +2320,23 @@ async def test_open_read_parallel(self, sftp):
23052320

23062321
remove('file')
23072322

2323+
@sftp_test
2324+
async def test_open_read_max_requests(self, sftp):
2325+
"""Test reading data from a file with max requests set"""
2326+
2327+
f = None
2328+
2329+
try:
2330+
self._create_file('file', 16*1024*1024*'\0')
2331+
2332+
f = await sftp.open('file', max_requests=4)
2333+
self.assertEqual(len(await f.read()), 16*1024*1024)
2334+
finally:
2335+
if f: # pragma: no branch
2336+
await f.close()
2337+
2338+
remove('file')
2339+
23082340
def test_open_read_out_of_order(self):
23092341
"""Test parallel read with out-of-order responses"""
23102342

0 commit comments

Comments
 (0)