Skip to content

Commit 69286a8

Browse files
authored
Upgrade local download concurrency control to use aioboto3's built-in handling (#579)
* Use aioboto3's built-in concurrency handling * Add test coverage
1 parent 40f65bd commit 69286a8

File tree

3 files changed

+43
-26
lines changed

3 files changed

+43
-26
lines changed

servicex/minio_adapter.py

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,20 @@
3030
from pathlib import Path
3131
from typing import List
3232

33-
import asyncio
3433
from tenacity import retry, stop_after_attempt, wait_random_exponential
3534

3635
import aioboto3
36+
from boto3.s3.transfer import TransferConfig
3737

3838
from servicex.models import ResultFile, TransformStatus
3939

40-
_semaphore = asyncio.Semaphore(8)
40+
_transferconfig = TransferConfig(max_concurrency=10)
4141

4242

43-
def init_download_semaphore(concurrency: int = 8):
43+
def init_s3_config(concurrency: int = 10):
4444
"Update the number of concurrent connections"
45-
global _semaphore
46-
_semaphore = asyncio.Semaphore(concurrency)
45+
global _transferconfig
46+
_transferconfig = TransferConfig(max_concurrency=concurrency)
4747

4848

4949
def _sanitize_filename(fname: str):
@@ -114,31 +114,29 @@ async def download_file(
114114
)
115115
)
116116

117-
async with _semaphore:
118-
async with self.minio.resource("s3", endpoint_url=self.endpoint_host) as s3:
119-
obj = await s3.Object(self.bucket, object_name)
120-
remotesize = await obj.content_length
121-
if path.exists():
122-
# if file size is the same, let's not download anything
123-
# maybe move to a better verification mechanism with e-tags in the future
124-
localsize = path.stat().st_size
125-
if localsize == remotesize:
126-
return path.resolve()
127-
await obj.download_file(path.as_posix())
117+
async with self.minio.resource("s3", endpoint_url=self.endpoint_host) as s3:
118+
obj = await s3.Object(self.bucket, object_name)
119+
remotesize = await obj.content_length
120+
if path.exists():
121+
# if file size is the same, let's not download anything
122+
# maybe move to a better verification mechanism with e-tags in the future
128123
localsize = path.stat().st_size
129-
if localsize != remotesize:
130-
raise RuntimeError(f"Download of {object_name} failed")
124+
if localsize == remotesize:
125+
return path.resolve()
126+
await obj.download_file(path.as_posix(), Config=_transferconfig)
127+
localsize = path.stat().st_size
128+
if localsize != remotesize:
129+
raise RuntimeError(f"Download of {object_name} failed")
131130
return path.resolve()
132131

133132
@retry(
134133
stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True
135134
)
136135
async def get_signed_url(self, object_name: str) -> str:
137-
async with _semaphore:
138-
async with self.minio.client("s3", endpoint_url=self.endpoint_host) as s3:
139-
return await s3.generate_presigned_url(
140-
"get_object", Params={"Bucket": self.bucket, "Key": object_name}
141-
)
136+
async with self.minio.client("s3", endpoint_url=self.endpoint_host) as s3:
137+
return await s3.generate_presigned_url(
138+
"get_object", Params={"Bucket": self.bucket, "Key": object_name}
139+
)
142140

143141
@classmethod
144142
def hash_path(cls, file_name):

servicex/servicex_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ def deliver(
228228
fail_if_incomplete: bool = True,
229229
ignore_local_cache: bool = False,
230230
progress_bar: ProgressBarFormat = ProgressBarFormat.default,
231-
concurrency: int = 8,
231+
concurrency: int = 10,
232232
):
233233
r"""
234234
Execute a ServiceX query.
@@ -254,9 +254,9 @@ def deliver(
254254
:return: A dictionary mapping the name of each :py:class:`Sample` to a :py:class:`.GuardList`
255255
with the file names or URLs for the outputs.
256256
"""
257-
from .minio_adapter import init_download_semaphore
257+
from .minio_adapter import init_s3_config
258258

259-
init_download_semaphore(concurrency)
259+
init_s3_config(concurrency)
260260
config = _load_ServiceXSpec(spec)
261261

262262
if ignore_local_cache or config.General.IgnoreLocalCache:

tests/test_minio_adapter.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,25 @@ async def test_download_short_filename_change(minio_adapter, populate_bucket):
160160
result.unlink() # it should exist, from above ...
161161

162162

163+
@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True)
164+
@pytest.mark.asyncio
165+
async def test_download_repeat(minio_adapter, populate_bucket):
166+
import asyncio
167+
168+
print(await minio_adapter.list_bucket())
169+
result = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo")
170+
assert str(result).endswith("test.txt")
171+
assert result.exists()
172+
t0 = result.stat().st_mtime_ns
173+
await asyncio.sleep(4) # hopefully long enough for Windows/FAT32 ... ?
174+
175+
result2 = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo")
176+
assert result2.exists()
177+
assert result2 == result
178+
assert t0 == result2.stat().st_mtime_ns
179+
result.unlink() # it should exist, from above ...
180+
181+
163182
@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True)
164183
@pytest.mark.asyncio
165184
async def test_download_file_retry(download_patch, minio_adapter, populate_bucket):

0 commit comments

Comments
 (0)