From 2f3fa85a9b613adeeed006e4d2c6fa78592a0a9e Mon Sep 17 00:00:00 2001 From: Peter Onyisi Date: Sat, 17 May 2025 04:23:43 +0000 Subject: [PATCH 1/6] Add back concurrency limits --- servicex/minio_adapter.py | 62 ++++++++++++++++++++++------------ servicex/query_core.py | 7 +++- tests/test_minio_adapter.py | 13 +++++++ tests/test_servicex_dataset.py | 6 ++-- 4 files changed, 63 insertions(+), 25 deletions(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 88b90588..973fc398 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -28,22 +28,25 @@ import os.path from hashlib import sha1 from pathlib import Path -from typing import List +from typing import List, Optional from tenacity import retry, stop_after_attempt, wait_random_exponential import aioboto3 from boto3.s3.transfer import TransferConfig +import asyncio from servicex.models import ResultFile, TransformStatus -_transferconfig = TransferConfig(max_concurrency=10) +_transferconfig = TransferConfig(max_concurrency=5) +_sem = asyncio.Semaphore(10) +_bucket_list_sem = asyncio.Semaphore(5) def init_s3_config(concurrency: int = 10): "Update the number of concurrent connections" - global _transferconfig - _transferconfig = TransferConfig(max_concurrency=concurrency) + global _sem + _sem = asyncio.Semaphore(concurrency) def _sanitize_filename(fname: str): @@ -85,24 +88,31 @@ def for_transform(cls, transform: TransformStatus): stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True ) async def list_bucket(self) -> List[ResultFile]: - async with self.minio.resource("s3", endpoint_url=self.endpoint_host) as s3: - bucket = await s3.Bucket(self.bucket) - objects = bucket.objects.all() - return [ - ResultFile( - filename=obj.key, - size=await obj.size, - extension=obj.key.split(".")[-1], - ) - async for obj in objects - if not obj.key.endswith("/") - ] + async with _bucket_list_sem: + async with self.minio.client("s3", endpoint_url=self.endpoint_host) as s3: + paginator = s3.get_paginator("list_objects_v2") + pagination = paginator.paginate(Bucket=self.bucket) + listing = await pagination.build_full_result() + rv = [ + ResultFile( + filename=_["Key"], + size=_["Size"], + extension=_["Key"].split(".")[-1], + ) + for _ in listing["Contents"] + if not _["Key"].endswith("/") + ] + return rv @retry( stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True ) async def download_file( - self, object_name: str, local_dir: str, shorten_filename: bool = False + self, + object_name: str, + local_dir: str, + shorten_filename: bool = False, + expected_size: Optional[int] = None, ) -> Path: os.makedirs(local_dir, exist_ok=True) path = Path( @@ -114,16 +124,26 @@ async def download_file( ) ) - async with self.minio.resource("s3", endpoint_url=self.endpoint_host) as s3: - obj = await s3.Object(self.bucket, object_name) - remotesize = await obj.content_length + async with self.minio.client("s3", endpoint_url=self.endpoint_host) as s3: + if expected_size is not None: + remotesize = expected_size + else: + async with _sem: + info = await s3.head_object(Bucket=self.bucket, Key=object_name) + remotesize = info["ContentLength"] if path.exists(): # if file size is the same, let's not download anything # maybe move to a better verification mechanism with e-tags in the future localsize = path.stat().st_size if localsize == remotesize: return path.resolve() - await obj.download_file(path.as_posix(), Config=_transferconfig) + async with _sem: + await s3.download_file( + Bucket=self.bucket, + Key=object_name, + Filename=path.as_posix(), + Config=_transferconfig, + ) localsize = path.stat().st_size if localsize != remotesize: raise RuntimeError(f"Download of {object_name} failed") diff --git a/servicex/query_core.py b/servicex/query_core.py index ac4f5f09..6f54a311 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -533,9 +533,13 @@ async def download_file( progress: Progress, download_progress: TaskID, shorten_filename: bool = False, + expected_size: Optional[int] = None, ): downloaded_filename = await minio.download_file( - filename, self.download_path, shorten_filename=shorten_filename + filename, + self.download_path, + shorten_filename=shorten_filename, + expected_size=expected_size, ) result_uris.append(downloaded_filename.as_posix()) progress.advance(task_id=download_progress, task_type="Download") @@ -580,6 +584,7 @@ async def get_signed_url( progress, download_progress, shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501 + expected_size=file.size, ) ) ) # NOQA 501 diff --git a/tests/test_minio_adapter.py b/tests/test_minio_adapter.py index fd1c9aa0..d985b0f0 100644 --- a/tests/test_minio_adapter.py +++ b/tests/test_minio_adapter.py @@ -112,6 +112,19 @@ async def test_download_file(minio_adapter, populate_bucket): result.unlink() # it should exist, from above ... +@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True) +@pytest.mark.asyncio +async def test_download_file_with_expected_size(minio_adapter, populate_bucket): + info = await minio_adapter.list_bucket() + result = await minio_adapter.download_file( + "test.txt", local_dir="/tmp/foo", expected_size=info[0].size + ) + assert str(result).endswith("test.txt") + assert result.exists() + assert result.read_bytes() == (b"\x01" * 10) + result.unlink() # it should exist, from above ... + + @pytest.mark.parametrize("populate_bucket", ["t::est.txt"], indirect=True) @pytest.mark.asyncio async def test_download_bad_filename(minio_adapter, populate_bucket): diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index 628dc2e0..28cbd0f7 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -216,7 +216,7 @@ async def test_submit(mocker): mock_minio = AsyncMock() mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]]) mock_minio.download_file = AsyncMock( - side_effect=lambda a, _, shorten_filename: PurePath(a) + side_effect=lambda a, _, shorten_filename, expected_size: PurePath(a) ) mock_cache = mocker.MagicMock(QueryCache) @@ -260,7 +260,7 @@ async def test_submit_partial_success(mocker): mock_minio = AsyncMock() mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( - side_effect=lambda a, _, shorten_filename: PurePath(a) + side_effect=lambda a, _, shorten_filename, expected_size: PurePath(a) ) mock_cache = mocker.MagicMock(QueryCache) @@ -303,7 +303,7 @@ async def test_use_of_cache(mocker): mock_minio = AsyncMock() mock_minio.list_bucket = AsyncMock(return_value=[file1, file2]) mock_minio.download_file = AsyncMock( - side_effect=lambda a, _, shorten_filename: PurePath(a) + side_effect=lambda a, _, shorten_filename, expected_size: PurePath(a) ) mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"]) From 37b0da73d0f3c9984067591e3ae0ac7bb743413d Mon Sep 17 00:00:00 2001 From: ponyisi Date: Fri, 16 May 2025 23:44:03 -0500 Subject: [PATCH 2/6] Protect Contents retrieval Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- servicex/minio_adapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 973fc398..08f0b2bd 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -99,7 +99,7 @@ async def list_bucket(self) -> List[ResultFile]: size=_["Size"], extension=_["Key"].split(".")[-1], ) - for _ in listing["Contents"] + for _ in listing.get("Contents", []) if not _["Key"].endswith("/") ] return rv From e971afd4c72c2b9d6b8bc86488e2238e88c95176 Mon Sep 17 00:00:00 2001 From: Peter Onyisi Date: Sat, 17 May 2025 04:23:43 +0000 Subject: [PATCH 3/6] Add back concurrency limits --- servicex/minio_adapter.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 08f0b2bd..e7dc89da 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -38,15 +38,18 @@ from servicex.models import ResultFile, TransformStatus +# Maximum five simultaneous streams per individual file download _transferconfig = TransferConfig(max_concurrency=5) -_sem = asyncio.Semaphore(10) +# Maximum ten files simultaneously being downloaded (configurable with init_s3_config) +_file_transfer_sem = asyncio.Semaphore(10) +# Maximum five buckets being queried at once _bucket_list_sem = asyncio.Semaphore(5) def init_s3_config(concurrency: int = 10): "Update the number of concurrent connections" - global _sem - _sem = asyncio.Semaphore(concurrency) + global _file_transfer_sem + _file_transfer_sem = asyncio.Semaphore(concurrency) def _sanitize_filename(fname: str): @@ -128,7 +131,7 @@ async def download_file( if expected_size is not None: remotesize = expected_size else: - async with _sem: + async with _file_transfer_sem: info = await s3.head_object(Bucket=self.bucket, Key=object_name) remotesize = info["ContentLength"] if path.exists(): @@ -137,7 +140,7 @@ async def download_file( localsize = path.stat().st_size if localsize == remotesize: return path.resolve() - async with _sem: + async with _file_transfer_sem: await s3.download_file( Bucket=self.bucket, Key=object_name, From f80ec490c066857968681b4f3f1f5a256a269718 Mon Sep 17 00:00:00 2001 From: Peter Onyisi Date: Tue, 20 May 2025 04:35:45 +0000 Subject: [PATCH 4/6] Possible Windows test fix --- tests/test_minio_adapter.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/tests/test_minio_adapter.py b/tests/test_minio_adapter.py index d985b0f0..df6f1a0f 100644 --- a/tests/test_minio_adapter.py +++ b/tests/test_minio_adapter.py @@ -34,8 +34,12 @@ from servicex.minio_adapter import MinioAdapter from servicex.models import ResultFile +import asyncio + DOWNLOAD_PATCH_COUNTER = 0 +_semaphore = asyncio.Semaphore(1) + def mock_downloader(**args): global DOWNLOAD_PATCH_COUNTER @@ -66,15 +70,16 @@ def minio_adapter(moto_services, moto_patch_session) -> MinioAdapter: @fixture async def populate_bucket(request, minio_adapter): - async with minio_adapter.minio.client( - "s3", endpoint_url=minio_adapter.endpoint_host - ) as s3: - await s3.create_bucket(Bucket=minio_adapter.bucket) - await s3.put_object( - Bucket=minio_adapter.bucket, Key=request.param, Body=b"\x01" * 10 - ) - yield - await s3.delete_object(Bucket=minio_adapter.bucket, Key=request.param) + async with _semaphore: + async with minio_adapter.minio.client( + "s3", endpoint_url=minio_adapter.endpoint_host + ) as s3: + await s3.create_bucket(Bucket=minio_adapter.bucket) + await s3.put_object( + Bucket=minio_adapter.bucket, Key=request.param, Body=b"\x01" * 10 + ) + yield + await s3.delete_object(Bucket=minio_adapter.bucket, Key=request.param) @fixture From 2a172a6845edd389e7773820910dcac125401e4a Mon Sep 17 00:00:00 2001 From: Peter Onyisi Date: Tue, 20 May 2025 05:26:34 +0000 Subject: [PATCH 5/6] Reorder tests --- tests/app/test_datasets.py | 1 - tests/test_minio_adapter.py | 49 +++++++++++++++++-------------------- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/tests/app/test_datasets.py b/tests/app/test_datasets.py index 3b7230e9..4ae76c97 100644 --- a/tests/app/test_datasets.py +++ b/tests/app/test_datasets.py @@ -64,7 +64,6 @@ def dataset(): return cached_dataset -@pytest.mark.asyncio def test_datasets_list(script_runner, dataset): with patch("servicex.app.datasets.ServiceXClient") as mock_servicex: mock_get_datasets = AsyncMock(return_value=[dataset]) diff --git a/tests/test_minio_adapter.py b/tests/test_minio_adapter.py index df6f1a0f..83678e3a 100644 --- a/tests/test_minio_adapter.py +++ b/tests/test_minio_adapter.py @@ -34,12 +34,8 @@ from servicex.minio_adapter import MinioAdapter from servicex.models import ResultFile -import asyncio - DOWNLOAD_PATCH_COUNTER = 0 -_semaphore = asyncio.Semaphore(1) - def mock_downloader(**args): global DOWNLOAD_PATCH_COUNTER @@ -70,16 +66,15 @@ def minio_adapter(moto_services, moto_patch_session) -> MinioAdapter: @fixture async def populate_bucket(request, minio_adapter): - async with _semaphore: - async with minio_adapter.minio.client( - "s3", endpoint_url=minio_adapter.endpoint_host - ) as s3: - await s3.create_bucket(Bucket=minio_adapter.bucket) - await s3.put_object( - Bucket=minio_adapter.bucket, Key=request.param, Body=b"\x01" * 10 - ) - yield - await s3.delete_object(Bucket=minio_adapter.bucket, Key=request.param) + async with minio_adapter.minio.client( + "s3", endpoint_url=minio_adapter.endpoint_host + ) as s3: + await s3.create_bucket(Bucket=minio_adapter.bucket) + await s3.put_object( + Bucket=minio_adapter.bucket, Key=request.param, Body=b"\x01" * 10 + ) + yield + await s3.delete_object(Bucket=minio_adapter.bucket, Key=request.param) @fixture @@ -180,33 +175,33 @@ async def test_download_short_filename_change(minio_adapter, populate_bucket): @pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True) @pytest.mark.asyncio +async def test_download_file_retry(download_patch, minio_adapter, populate_bucket): + result = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo") + assert str(result).endswith("test.txt") + assert result.exists() + assert len(download_patch.call_args_list) == 3 + result.unlink() + + +@pytest.mark.parametrize("populate_bucket", ["test2.txt"], indirect=True) +@pytest.mark.asyncio async def test_download_repeat(minio_adapter, populate_bucket): import asyncio print(await minio_adapter.list_bucket()) - result = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo") - assert str(result).endswith("test.txt") + result = await minio_adapter.download_file("test2.txt", local_dir="/tmp/foo") + assert str(result).endswith("test2.txt") assert result.exists() t0 = result.stat().st_mtime_ns await asyncio.sleep(4) # hopefully long enough for Windows/FAT32 ... ? - result2 = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo") + result2 = await minio_adapter.download_file("test2.txt", local_dir="/tmp/foo") assert result2.exists() assert result2 == result assert t0 == result2.stat().st_mtime_ns result.unlink() # it should exist, from above ... -@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True) -@pytest.mark.asyncio -async def test_download_file_retry(download_patch, minio_adapter, populate_bucket): - result = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo") - assert str(result).endswith("test.txt") - assert result.exists() - assert len(download_patch.call_args_list) == 3 - result.unlink() - - @pytest.mark.asyncio async def test_get_signed_url(minio_adapter, moto_services): result = await minio_adapter.get_signed_url("test.txt") From 5cb8f4d0006ddd7072dcc9d6c1e24d70a21d350c Mon Sep 17 00:00:00 2001 From: Peter Onyisi Date: Tue, 20 May 2025 06:02:30 +0000 Subject: [PATCH 6/6] More patch changes --- tests/test_minio_adapter.py | 37 ++++++++++++++++--------------------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/tests/test_minio_adapter.py b/tests/test_minio_adapter.py index 83678e3a..0487facf 100644 --- a/tests/test_minio_adapter.py +++ b/tests/test_minio_adapter.py @@ -25,7 +25,6 @@ # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -from unittest.mock import AsyncMock import urllib.parse import pytest @@ -48,14 +47,6 @@ def mock_downloader(**args): open("/tmp/foo/test.txt", "wb").write(b"\x01" * 10) -@fixture -def download_patch(): - import aioboto3.s3.inject - - aioboto3.s3.inject.download_file = AsyncMock(side_effect=mock_downloader) - return aioboto3.s3.inject.download_file - - @fixture def minio_adapter(moto_services, moto_patch_session) -> MinioAdapter: urlinfo = urllib.parse.urlparse(moto_services["s3"]) @@ -64,7 +55,7 @@ def minio_adapter(moto_services, moto_patch_session) -> MinioAdapter: ) -@fixture +@fixture(scope="function") async def populate_bucket(request, minio_adapter): async with minio_adapter.minio.client( "s3", endpoint_url=minio_adapter.endpoint_host @@ -173,16 +164,6 @@ async def test_download_short_filename_change(minio_adapter, populate_bucket): result.unlink() # it should exist, from above ... -@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True) -@pytest.mark.asyncio -async def test_download_file_retry(download_patch, minio_adapter, populate_bucket): - result = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo") - assert str(result).endswith("test.txt") - assert result.exists() - assert len(download_patch.call_args_list) == 3 - result.unlink() - - @pytest.mark.parametrize("populate_bucket", ["test2.txt"], indirect=True) @pytest.mark.asyncio async def test_download_repeat(minio_adapter, populate_bucket): @@ -202,7 +183,21 @@ async def test_download_repeat(minio_adapter, populate_bucket): result.unlink() # it should exist, from above ... +@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True) @pytest.mark.asyncio -async def test_get_signed_url(minio_adapter, moto_services): +async def test_get_signed_url(minio_adapter, moto_services, populate_bucket): result = await minio_adapter.get_signed_url("test.txt") assert result.startswith(moto_services["s3"]) + + +@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True) +@pytest.mark.asyncio +async def test_download_file_retry(minio_adapter, populate_bucket, mocker): + download_patch = mocker.patch( + "aioboto3.s3.inject.download_file", side_effect=mock_downloader + ) + result = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo") + assert str(result).endswith("test.txt") + assert result.exists() + assert download_patch.call_count == 3 + result.unlink()