Skip to content

Commit 54ecac9

Browse files
ponyisiCopilot
andauthored
Add back concurrency semaphore (#593)
* Add back concurrency limits * Protect Contents retrieval --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent da2a778 commit 54ecac9

File tree

5 files changed

+83
-48
lines changed

5 files changed

+83
-48
lines changed

servicex/minio_adapter.py

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,22 +28,28 @@
2828
import os.path
2929
from hashlib import sha1
3030
from pathlib import Path
31-
from typing import List
31+
from typing import List, Optional
3232

3333
from tenacity import retry, stop_after_attempt, wait_random_exponential
3434

3535
import aioboto3
3636
from boto3.s3.transfer import TransferConfig
37+
import asyncio
3738

3839
from servicex.models import ResultFile, TransformStatus
3940

40-
_transferconfig = TransferConfig(max_concurrency=10)
41+
# Maximum five simultaneous streams per individual file download
42+
_transferconfig = TransferConfig(max_concurrency=5)
43+
# Maximum ten files simultaneously being downloaded (configurable with init_s3_config)
44+
_file_transfer_sem = asyncio.Semaphore(10)
45+
# Maximum five buckets being queried at once
46+
_bucket_list_sem = asyncio.Semaphore(5)
4147

4248

4349
def init_s3_config(concurrency: int = 10):
4450
"Update the number of concurrent connections"
45-
global _transferconfig
46-
_transferconfig = TransferConfig(max_concurrency=concurrency)
51+
global _file_transfer_sem
52+
_file_transfer_sem = asyncio.Semaphore(concurrency)
4753

4854

4955
def _sanitize_filename(fname: str):
@@ -85,24 +91,31 @@ def for_transform(cls, transform: TransformStatus):
8591
stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True
8692
)
8793
async def list_bucket(self) -> List[ResultFile]:
88-
async with self.minio.resource("s3", endpoint_url=self.endpoint_host) as s3:
89-
bucket = await s3.Bucket(self.bucket)
90-
objects = bucket.objects.all()
91-
return [
92-
ResultFile(
93-
filename=obj.key,
94-
size=await obj.size,
95-
extension=obj.key.split(".")[-1],
96-
)
97-
async for obj in objects
98-
if not obj.key.endswith("/")
99-
]
94+
async with _bucket_list_sem:
95+
async with self.minio.client("s3", endpoint_url=self.endpoint_host) as s3:
96+
paginator = s3.get_paginator("list_objects_v2")
97+
pagination = paginator.paginate(Bucket=self.bucket)
98+
listing = await pagination.build_full_result()
99+
rv = [
100+
ResultFile(
101+
filename=_["Key"],
102+
size=_["Size"],
103+
extension=_["Key"].split(".")[-1],
104+
)
105+
for _ in listing.get("Contents", [])
106+
if not _["Key"].endswith("/")
107+
]
108+
return rv
100109

101110
@retry(
102111
stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True
103112
)
104113
async def download_file(
105-
self, object_name: str, local_dir: str, shorten_filename: bool = False
114+
self,
115+
object_name: str,
116+
local_dir: str,
117+
shorten_filename: bool = False,
118+
expected_size: Optional[int] = None,
106119
) -> Path:
107120
os.makedirs(local_dir, exist_ok=True)
108121
path = Path(
@@ -114,16 +127,26 @@ async def download_file(
114127
)
115128
)
116129

117-
async with self.minio.resource("s3", endpoint_url=self.endpoint_host) as s3:
118-
obj = await s3.Object(self.bucket, object_name)
119-
remotesize = await obj.content_length
130+
async with self.minio.client("s3", endpoint_url=self.endpoint_host) as s3:
131+
if expected_size is not None:
132+
remotesize = expected_size
133+
else:
134+
async with _file_transfer_sem:
135+
info = await s3.head_object(Bucket=self.bucket, Key=object_name)
136+
remotesize = info["ContentLength"]
120137
if path.exists():
121138
# if file size is the same, let's not download anything
122139
# maybe move to a better verification mechanism with e-tags in the future
123140
localsize = path.stat().st_size
124141
if localsize == remotesize:
125142
return path.resolve()
126-
await obj.download_file(path.as_posix(), Config=_transferconfig)
143+
async with _file_transfer_sem:
144+
await s3.download_file(
145+
Bucket=self.bucket,
146+
Key=object_name,
147+
Filename=path.as_posix(),
148+
Config=_transferconfig,
149+
)
127150
localsize = path.stat().st_size
128151
if localsize != remotesize:
129152
raise RuntimeError(f"Download of {object_name} failed")

servicex/query_core.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,9 +533,13 @@ async def download_file(
533533
progress: Progress,
534534
download_progress: TaskID,
535535
shorten_filename: bool = False,
536+
expected_size: Optional[int] = None,
536537
):
537538
downloaded_filename = await minio.download_file(
538-
filename, self.download_path, shorten_filename=shorten_filename
539+
filename,
540+
self.download_path,
541+
shorten_filename=shorten_filename,
542+
expected_size=expected_size,
539543
)
540544
result_uris.append(downloaded_filename.as_posix())
541545
progress.advance(task_id=download_progress, task_type="Download")
@@ -580,6 +584,7 @@ async def get_signed_url(
580584
progress,
581585
download_progress,
582586
shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501
587+
expected_size=file.size,
583588
)
584589
)
585590
) # NOQA 501

tests/app/test_datasets.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ def dataset():
6464
return cached_dataset
6565

6666

67-
@pytest.mark.asyncio
6867
def test_datasets_list(script_runner, dataset):
6968
with patch("servicex.app.datasets.ServiceXClient") as mock_servicex:
7069
mock_get_datasets = AsyncMock(return_value=[dataset])

tests/test_minio_adapter.py

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
2626
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
2727
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28-
from unittest.mock import AsyncMock
2928
import urllib.parse
3029

3130
import pytest
@@ -48,14 +47,6 @@ def mock_downloader(**args):
4847
open("/tmp/foo/test.txt", "wb").write(b"\x01" * 10)
4948

5049

51-
@fixture
52-
def download_patch():
53-
import aioboto3.s3.inject
54-
55-
aioboto3.s3.inject.download_file = AsyncMock(side_effect=mock_downloader)
56-
return aioboto3.s3.inject.download_file
57-
58-
5950
@fixture
6051
def minio_adapter(moto_services, moto_patch_session) -> MinioAdapter:
6152
urlinfo = urllib.parse.urlparse(moto_services["s3"])
@@ -64,7 +55,7 @@ def minio_adapter(moto_services, moto_patch_session) -> MinioAdapter:
6455
)
6556

6657

67-
@fixture
58+
@fixture(scope="function")
6859
async def populate_bucket(request, minio_adapter):
6960
async with minio_adapter.minio.client(
7061
"s3", endpoint_url=minio_adapter.endpoint_host
@@ -112,6 +103,19 @@ async def test_download_file(minio_adapter, populate_bucket):
112103
result.unlink() # it should exist, from above ...
113104

114105

106+
@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True)
107+
@pytest.mark.asyncio
108+
async def test_download_file_with_expected_size(minio_adapter, populate_bucket):
109+
info = await minio_adapter.list_bucket()
110+
result = await minio_adapter.download_file(
111+
"test.txt", local_dir="/tmp/foo", expected_size=info[0].size
112+
)
113+
assert str(result).endswith("test.txt")
114+
assert result.exists()
115+
assert result.read_bytes() == (b"\x01" * 10)
116+
result.unlink() # it should exist, from above ...
117+
118+
115119
@pytest.mark.parametrize("populate_bucket", ["t::est.txt"], indirect=True)
116120
@pytest.mark.asyncio
117121
async def test_download_bad_filename(minio_adapter, populate_bucket):
@@ -160,19 +164,19 @@ async def test_download_short_filename_change(minio_adapter, populate_bucket):
160164
result.unlink() # it should exist, from above ...
161165

162166

163-
@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True)
167+
@pytest.mark.parametrize("populate_bucket", ["test2.txt"], indirect=True)
164168
@pytest.mark.asyncio
165169
async def test_download_repeat(minio_adapter, populate_bucket):
166170
import asyncio
167171

168172
print(await minio_adapter.list_bucket())
169-
result = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo")
170-
assert str(result).endswith("test.txt")
173+
result = await minio_adapter.download_file("test2.txt", local_dir="/tmp/foo")
174+
assert str(result).endswith("test2.txt")
171175
assert result.exists()
172176
t0 = result.stat().st_mtime_ns
173177
await asyncio.sleep(4) # hopefully long enough for Windows/FAT32 ... ?
174178

175-
result2 = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo")
179+
result2 = await minio_adapter.download_file("test2.txt", local_dir="/tmp/foo")
176180
assert result2.exists()
177181
assert result2 == result
178182
assert t0 == result2.stat().st_mtime_ns
@@ -181,15 +185,19 @@ async def test_download_repeat(minio_adapter, populate_bucket):
181185

182186
@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True)
183187
@pytest.mark.asyncio
184-
async def test_download_file_retry(download_patch, minio_adapter, populate_bucket):
188+
async def test_get_signed_url(minio_adapter, moto_services, populate_bucket):
189+
result = await minio_adapter.get_signed_url("test.txt")
190+
assert result.startswith(moto_services["s3"])
191+
192+
193+
@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True)
194+
@pytest.mark.asyncio
195+
async def test_download_file_retry(minio_adapter, populate_bucket, mocker):
196+
download_patch = mocker.patch(
197+
"aioboto3.s3.inject.download_file", side_effect=mock_downloader
198+
)
185199
result = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo")
186200
assert str(result).endswith("test.txt")
187201
assert result.exists()
188-
assert len(download_patch.call_args_list) == 3
202+
assert download_patch.call_count == 3
189203
result.unlink()
190-
191-
192-
@pytest.mark.asyncio
193-
async def test_get_signed_url(minio_adapter, moto_services):
194-
result = await minio_adapter.get_signed_url("test.txt")
195-
assert result.startswith(moto_services["s3"])

tests/test_servicex_dataset.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ async def test_submit(mocker):
216216
mock_minio = AsyncMock()
217217
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]])
218218
mock_minio.download_file = AsyncMock(
219-
side_effect=lambda a, _, shorten_filename: PurePath(a)
219+
side_effect=lambda a, _, shorten_filename, expected_size: PurePath(a)
220220
)
221221

222222
mock_cache = mocker.MagicMock(QueryCache)
@@ -260,7 +260,7 @@ async def test_submit_partial_success(mocker):
260260
mock_minio = AsyncMock()
261261
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]])
262262
mock_minio.download_file = AsyncMock(
263-
side_effect=lambda a, _, shorten_filename: PurePath(a)
263+
side_effect=lambda a, _, shorten_filename, expected_size: PurePath(a)
264264
)
265265

266266
mock_cache = mocker.MagicMock(QueryCache)
@@ -303,7 +303,7 @@ async def test_use_of_cache(mocker):
303303
mock_minio = AsyncMock()
304304
mock_minio.list_bucket = AsyncMock(return_value=[file1, file2])
305305
mock_minio.download_file = AsyncMock(
306-
side_effect=lambda a, _, shorten_filename: PurePath(a)
306+
side_effect=lambda a, _, shorten_filename, expected_size: PurePath(a)
307307
)
308308
mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"])
309309

0 commit comments

Comments
 (0)