diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 88b90588..e7dc89da 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -28,22 +28,28 @@ import os.path from hashlib import sha1 from pathlib import Path -from typing import List +from typing import List, Optional from tenacity import retry, stop_after_attempt, wait_random_exponential import aioboto3 from boto3.s3.transfer import TransferConfig +import asyncio from servicex.models import ResultFile, TransformStatus -_transferconfig = TransferConfig(max_concurrency=10) +# Maximum five simultaneous streams per individual file download +_transferconfig = TransferConfig(max_concurrency=5) +# Maximum ten files simultaneously being downloaded (configurable with init_s3_config) +_file_transfer_sem = asyncio.Semaphore(10) +# Maximum five buckets being queried at once +_bucket_list_sem = asyncio.Semaphore(5) def init_s3_config(concurrency: int = 10): "Update the number of concurrent connections" - global _transferconfig - _transferconfig = TransferConfig(max_concurrency=concurrency) + global _file_transfer_sem + _file_transfer_sem = asyncio.Semaphore(concurrency) def _sanitize_filename(fname: str): @@ -85,24 +91,31 @@ def for_transform(cls, transform: TransformStatus): stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True ) async def list_bucket(self) -> List[ResultFile]: - async with self.minio.resource("s3", endpoint_url=self.endpoint_host) as s3: - bucket = await s3.Bucket(self.bucket) - objects = bucket.objects.all() - return [ - ResultFile( - filename=obj.key, - size=await obj.size, - extension=obj.key.split(".")[-1], - ) - async for obj in objects - if not obj.key.endswith("/") - ] + async with _bucket_list_sem: + async with self.minio.client("s3", endpoint_url=self.endpoint_host) as s3: + paginator = s3.get_paginator("list_objects_v2") + pagination = paginator.paginate(Bucket=self.bucket) + listing = await pagination.build_full_result() + rv = [ + ResultFile( + filename=_["Key"], + size=_["Size"], + extension=_["Key"].split(".")[-1], + ) + for _ in listing.get("Contents", []) + if not _["Key"].endswith("/") + ] + return rv @retry( stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True ) async def download_file( - self, object_name: str, local_dir: str, shorten_filename: bool = False + self, + object_name: str, + local_dir: str, + shorten_filename: bool = False, + expected_size: Optional[int] = None, ) -> Path: os.makedirs(local_dir, exist_ok=True) path = Path( @@ -114,16 +127,26 @@ async def download_file( ) ) - async with self.minio.resource("s3", endpoint_url=self.endpoint_host) as s3: - obj = await s3.Object(self.bucket, object_name) - remotesize = await obj.content_length + async with self.minio.client("s3", endpoint_url=self.endpoint_host) as s3: + if expected_size is not None: + remotesize = expected_size + else: + async with _file_transfer_sem: + info = await s3.head_object(Bucket=self.bucket, Key=object_name) + remotesize = info["ContentLength"] if path.exists(): # if file size is the same, let's not download anything # maybe move to a better verification mechanism with e-tags in the future localsize = path.stat().st_size if localsize == remotesize: return path.resolve() - await obj.download_file(path.as_posix(), Config=_transferconfig) + async with _file_transfer_sem: + await s3.download_file( + Bucket=self.bucket, + Key=object_name, + Filename=path.as_posix(), + Config=_transferconfig, + ) localsize = path.stat().st_size if localsize != remotesize: raise RuntimeError(f"Download of {object_name} failed") diff --git a/servicex/query_core.py b/servicex/query_core.py index ac4f5f09..6f54a311 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -533,9 +533,13 @@ async def download_file( progress: Progress, download_progress: TaskID, shorten_filename: bool = False, + expected_size: Optional[int] = None, ): downloaded_filename = await minio.download_file( - filename, self.download_path, shorten_filename=shorten_filename + filename, + self.download_path, + shorten_filename=shorten_filename, + expected_size=expected_size, ) result_uris.append(downloaded_filename.as_posix()) progress.advance(task_id=download_progress, task_type="Download") @@ -580,6 +584,7 @@ async def get_signed_url( progress, download_progress, shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501 + expected_size=file.size, ) ) ) # NOQA 501 diff --git a/tests/app/test_datasets.py b/tests/app/test_datasets.py index 3b7230e9..4ae76c97 100644 --- a/tests/app/test_datasets.py +++ b/tests/app/test_datasets.py @@ -64,7 +64,6 @@ def dataset(): return cached_dataset -@pytest.mark.asyncio def test_datasets_list(script_runner, dataset): with patch("servicex.app.datasets.ServiceXClient") as mock_servicex: mock_get_datasets = AsyncMock(return_value=[dataset]) diff --git a/tests/test_minio_adapter.py b/tests/test_minio_adapter.py index fd1c9aa0..0487facf 100644 --- a/tests/test_minio_adapter.py +++ b/tests/test_minio_adapter.py @@ -25,7 +25,6 @@ # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -from unittest.mock import AsyncMock import urllib.parse import pytest @@ -48,14 +47,6 @@ def mock_downloader(**args): open("/tmp/foo/test.txt", "wb").write(b"\x01" * 10) -@fixture -def download_patch(): - import aioboto3.s3.inject - - aioboto3.s3.inject.download_file = AsyncMock(side_effect=mock_downloader) - return aioboto3.s3.inject.download_file - - @fixture def minio_adapter(moto_services, moto_patch_session) -> MinioAdapter: urlinfo = urllib.parse.urlparse(moto_services["s3"]) @@ -64,7 +55,7 @@ def minio_adapter(moto_services, moto_patch_session) -> MinioAdapter: ) -@fixture +@fixture(scope="function") async def populate_bucket(request, minio_adapter): async with minio_adapter.minio.client( "s3", endpoint_url=minio_adapter.endpoint_host @@ -112,6 +103,19 @@ async def test_download_file(minio_adapter, populate_bucket): result.unlink() # it should exist, from above ... +@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True) +@pytest.mark.asyncio +async def test_download_file_with_expected_size(minio_adapter, populate_bucket): + info = await minio_adapter.list_bucket() + result = await minio_adapter.download_file( + "test.txt", local_dir="/tmp/foo", expected_size=info[0].size + ) + assert str(result).endswith("test.txt") + assert result.exists() + assert result.read_bytes() == (b"\x01" * 10) + result.unlink() # it should exist, from above ... + + @pytest.mark.parametrize("populate_bucket", ["t::est.txt"], indirect=True) @pytest.mark.asyncio async def test_download_bad_filename(minio_adapter, populate_bucket): @@ -160,19 +164,19 @@ async def test_download_short_filename_change(minio_adapter, populate_bucket): result.unlink() # it should exist, from above ... -@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True) +@pytest.mark.parametrize("populate_bucket", ["test2.txt"], indirect=True) @pytest.mark.asyncio async def test_download_repeat(minio_adapter, populate_bucket): import asyncio print(await minio_adapter.list_bucket()) - result = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo") - assert str(result).endswith("test.txt") + result = await minio_adapter.download_file("test2.txt", local_dir="/tmp/foo") + assert str(result).endswith("test2.txt") assert result.exists() t0 = result.stat().st_mtime_ns await asyncio.sleep(4) # hopefully long enough for Windows/FAT32 ... ? - result2 = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo") + result2 = await minio_adapter.download_file("test2.txt", local_dir="/tmp/foo") assert result2.exists() assert result2 == result assert t0 == result2.stat().st_mtime_ns @@ -181,15 +185,19 @@ async def test_download_repeat(minio_adapter, populate_bucket): @pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True) @pytest.mark.asyncio -async def test_download_file_retry(download_patch, minio_adapter, populate_bucket): +async def test_get_signed_url(minio_adapter, moto_services, populate_bucket): + result = await minio_adapter.get_signed_url("test.txt") + assert result.startswith(moto_services["s3"]) + + +@pytest.mark.parametrize("populate_bucket", ["test.txt"], indirect=True) +@pytest.mark.asyncio +async def test_download_file_retry(minio_adapter, populate_bucket, mocker): + download_patch = mocker.patch( + "aioboto3.s3.inject.download_file", side_effect=mock_downloader + ) result = await minio_adapter.download_file("test.txt", local_dir="/tmp/foo") assert str(result).endswith("test.txt") assert result.exists() - assert len(download_patch.call_args_list) == 3 + assert download_patch.call_count == 3 result.unlink() - - -@pytest.mark.asyncio -async def test_get_signed_url(minio_adapter, moto_services): - result = await minio_adapter.get_signed_url("test.txt") - assert result.startswith(moto_services["s3"]) diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index 628dc2e0..28cbd0f7 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -216,7 +216,7 @@ async def test_submit(mocker): mock_minio = AsyncMock() mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]]) mock_minio.download_file = AsyncMock( - side_effect=lambda a, _, shorten_filename: PurePath(a) + side_effect=lambda a, _, shorten_filename, expected_size: PurePath(a) ) mock_cache = mocker.MagicMock(QueryCache) @@ -260,7 +260,7 @@ async def test_submit_partial_success(mocker): mock_minio = AsyncMock() mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( - side_effect=lambda a, _, shorten_filename: PurePath(a) + side_effect=lambda a, _, shorten_filename, expected_size: PurePath(a) ) mock_cache = mocker.MagicMock(QueryCache) @@ -303,7 +303,7 @@ async def test_use_of_cache(mocker): mock_minio = AsyncMock() mock_minio.list_bucket = AsyncMock(return_value=[file1, file2]) mock_minio.download_file = AsyncMock( - side_effect=lambda a, _, shorten_filename: PurePath(a) + side_effect=lambda a, _, shorten_filename, expected_size: PurePath(a) ) mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"])