Skip to content

Commit 2f3fa85

Browse files
committed
Add back concurrency limits
1 parent 9c1849d commit 2f3fa85

File tree

4 files changed

+63
-25
lines changed

4 files changed

+63
-25
lines changed

servicex/minio_adapter.py

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,25 @@
2828
import os.path
2929
from hashlib import sha1
3030
from pathlib import Path
31-
from typing import List
31+
from typing import List, Optional
3232

3333
from tenacity import retry, stop_after_attempt, wait_random_exponential
3434

3535
import aioboto3
3636
from boto3.s3.transfer import TransferConfig
37+
import asyncio
3738

3839
from servicex.models import ResultFile, TransformStatus
3940

40-
_transferconfig = TransferConfig(max_concurrency=10)
41+
_transferconfig = TransferConfig(max_concurrency=5)
42+
_sem = asyncio.Semaphore(10)
43+
_bucket_list_sem = asyncio.Semaphore(5)
4144

4245

4346
def init_s3_config(concurrency: int = 10):
4447
"Update the number of concurrent connections"
45-
global _transferconfig
46-
_transferconfig = TransferConfig(max_concurrency=concurrency)
48+
global _sem
49+
_sem = asyncio.Semaphore(concurrency)
4750

4851

4952
def _sanitize_filename(fname: str):
@@ -85,24 +88,31 @@ def for_transform(cls, transform: TransformStatus):
8588
stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True
8689
)
8790
async def list_bucket(self) -> List[ResultFile]:
88-
async with self.minio.resource("s3", endpoint_url=self.endpoint_host) as s3:
89-
bucket = await s3.Bucket(self.bucket)
90-
objects = bucket.objects.all()
91-
return [
92-
ResultFile(
93-
filename=obj.key,
94-
size=await obj.size,
95-
extension=obj.key.split(".")[-1],
96-
)
97-
async for obj in objects
98-
if not obj.key.endswith("/")
99-
]
91+
async with _bucket_list_sem:
92+
async with self.minio.client("s3", endpoint_url=self.endpoint_host) as s3:
93+
paginator = s3.get_paginator("list_objects_v2")
94+
pagination = paginator.paginate(Bucket=self.bucket)
95+
listing = await pagination.build_full_result()
96+
rv = [
97+
ResultFile(
98+
filename=_["Key"],
99+
size=_["Size"],
100+
extension=_["Key"].split(".")[-1],
101+
)
102+
for _ in listing["Contents"]
103+
if not _["Key"].endswith("/")
104+
]
105+
return rv
100106

101107
@retry(
102108
stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True
103109
)
104110
async def download_file(
105-
self, object_name: str, local_dir: str, shorten_filename: bool = False
111+
self,
112+
object_name: str,
113+
local_dir: str,
114+
shorten_filename: bool = False,
115+
expected_size: Optional[int] = None,
106116
) -> Path:
107117
os.makedirs(local_dir, exist_ok=True)
108118
path = Path(
@@ -114,16 +124,26 @@ async def download_file(
114124
)
115125
)
116126

117-
async with self.minio.resource("s3", endpoint_url=self.endpoint_host) as s3:
118-
obj = await s3.Object(self.bucket, object_name)
119-
remotesize = await obj.content_length
127+
async with self.minio.client("s3", endpoint_url=self.endpoint_host) as s3:
128+
if expected_size is not None:
129+
remotesize = expected_size
130+
else:
131+
async with _sem:
132+
info = await s3.head_object(Bucket=self.bucket, Key=object_name)
133+
remotesize = info["ContentLength"]
120134
if path.exists():
121135
# if file size is the same, let's not download anything
122136
# maybe move to a better verification mechanism with e-tags in the future
123137
localsize = path.stat().st_size
124138
if localsize == remotesize:
125139
return path.resolve()
126-
await obj.download_file(path.as_posix(), Config=_transferconfig)
140+
async with _sem:
141+
await s3.download_file(
142+
Bucket=self.bucket,
143+
Key=object_name,
144+
Filename=path.as_posix(),
145+
Config=_transferconfig,
146+
)
127147
localsize = path.stat().st_size
128148
if localsize != remotesize:
129149
raise RuntimeError(f"Download of {object_name} failed")

servicex/query_core.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,9 +533,13 @@ async def download_file(
533533
progress: Progress,
534534
download_progress: TaskID,
535535
shorten_filename: bool = False,
536+
expected_size: Optional[int] = None,
536537
):
537538
downloaded_filename = await minio.download_file(
538-
filename, self.download_path, shorten_filename=shorten_filename
539+
filename,
540+
self.download_path,
541+
shorten_filename=shorten_filename,
542+
expected_size=expected_size,
539543
)
540544
result_uris.append(downloaded_filename.as_posix())
541545
progress.advance(task_id=download_progress, task_type="Download")
@@ -580,6 +584,7 @@ async def get_signed_url(
580584
progress,
581585
download_progress,
582586
shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501
587+
expected_size=file.size,
583588
)
584589
)
585590
) # NOQA 501

tests/test_minio_adapter.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,19 @@ async def test_download_file(minio_adapter, populate_bucket):
112112
result.unlink() # it should exist, from above ...
113113

114114

115+
@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True)
116+
@pytest.mark.asyncio
117+
async def test_download_file_with_expected_size(minio_adapter, populate_bucket):
118+
info = await minio_adapter.list_bucket()
119+
result = await minio_adapter.download_file(
120+
"test.txt", local_dir="/tmp/foo", expected_size=info[0].size
121+
)
122+
assert str(result).endswith("test.txt")
123+
assert result.exists()
124+
assert result.read_bytes() == (b"\x01" * 10)
125+
result.unlink() # it should exist, from above ...
126+
127+
115128
@pytest.mark.parametrize("populate_bucket", ["t::est.txt"], indirect=True)
116129
@pytest.mark.asyncio
117130
async def test_download_bad_filename(minio_adapter, populate_bucket):

tests/test_servicex_dataset.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ async def test_submit(mocker):
216216
mock_minio = AsyncMock()
217217
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]])
218218
mock_minio.download_file = AsyncMock(
219-
side_effect=lambda a, _, shorten_filename: PurePath(a)
219+
side_effect=lambda a, _, shorten_filename, expected_size: PurePath(a)
220220
)
221221

222222
mock_cache = mocker.MagicMock(QueryCache)
@@ -260,7 +260,7 @@ async def test_submit_partial_success(mocker):
260260
mock_minio = AsyncMock()
261261
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]])
262262
mock_minio.download_file = AsyncMock(
263-
side_effect=lambda a, _, shorten_filename: PurePath(a)
263+
side_effect=lambda a, _, shorten_filename, expected_size: PurePath(a)
264264
)
265265

266266
mock_cache = mocker.MagicMock(QueryCache)
@@ -303,7 +303,7 @@ async def test_use_of_cache(mocker):
303303
mock_minio = AsyncMock()
304304
mock_minio.list_bucket = AsyncMock(return_value=[file1, file2])
305305
mock_minio.download_file = AsyncMock(
306-
side_effect=lambda a, _, shorten_filename: PurePath(a)
306+
side_effect=lambda a, _, shorten_filename, expected_size: PurePath(a)
307307
)
308308
mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"])
309309

0 commit comments

Comments
 (0)