Skip to content

Commit 6517857

Browse files
authored
Switch S3 client to boto3 (#572)
* Limit simultaneous downloads; retry failed downloads * Better handling of fail_if_incomplete, access to set concurrency * Switch S3 client to boto3
1 parent 8288202 commit 6517857

File tree

7 files changed

+171
-111
lines changed

7 files changed

+171
-111
lines changed

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ dependencies = [
3838
"pydantic>=2.6.0",
3939
"aiohttp-retry>=2.8.3",
4040
"httpx>=0.24",
41-
"miniopy-async>=1.20.1",
41+
"aioboto3>=14.1.0",
4242
"tinydb>=4.7",
4343
"google-auth>=2.17",
4444
"typer>=0.12.1",
@@ -47,7 +47,6 @@ dependencies = [
4747
"importlib_metadata; python_version <= '3.9'",
4848
"typing_extensions; python_version <= '3.10'", # compatible versions controlled through pydantic
4949
"rich>=13.0.0", # databinder
50-
"aiofile", # compatible versions controlled through miniopy-async
5150
"make-it-sync", # compatible versions controlled through func_adl
5251
"ruamel.yaml>=0.18.7",
5352
"filelock>=3.12.0",
@@ -80,6 +79,7 @@ test = [
8079
"pandas>=2.0.2",
8180
"pyarrow>=12.0.0",
8281
"pre-commit>=4.0.1",
82+
"pytest-aioboto3>=0.6.0",
8383
]
8484
docs = [
8585
"sphinx>=7.0.1, <8.2.0",

servicex/minio_adapter.py

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2022, IRIS-HEP
1+
# Copyright (c) 2022-2025, IRIS-HEP
22
# All rights reserved.
33
#
44
# Redistribution and use in source and binary forms, with or without
@@ -30,13 +30,21 @@
3030
from pathlib import Path
3131
from typing import List
3232

33-
import aiohttp
33+
import asyncio
3434
from tenacity import retry, stop_after_attempt, wait_random_exponential
3535

36-
from miniopy_async import Minio
36+
import aioboto3
3737

3838
from servicex.models import ResultFile, TransformStatus
3939

40+
_semaphore = asyncio.Semaphore(8)
41+
42+
43+
def init_download_semaphore(concurrency: int = 8):
44+
"Update the number of concurrent connections"
45+
global _semaphore
46+
_semaphore = asyncio.Semaphore(concurrency)
47+
4048

4149
def _sanitize_filename(fname: str):
4250
"No matter the string given, make it an acceptable filename on all platforms"
@@ -56,10 +64,11 @@ def __init__(
5664
secret_key: str,
5765
bucket: str,
5866
):
59-
self.minio = Minio(
60-
endpoint_host, access_key=access_key, secret_key=secret_key, secure=secure
67+
self.minio = aioboto3.Session(
68+
aws_access_key_id=access_key, aws_secret_access_key=secret_key
6169
)
6270

71+
self.endpoint_host = ("https://" if secure else "http://") + endpoint_host
6372
self.bucket = bucket
6473

6574
@classmethod
@@ -76,16 +85,18 @@ def for_transform(cls, transform: TransformStatus):
7685
stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True
7786
)
7887
async def list_bucket(self) -> List[ResultFile]:
79-
objects = await self.minio.list_objects(self.bucket)
80-
return [
81-
ResultFile(
82-
filename=obj.object_name,
83-
size=obj.size,
84-
extension=obj.object_name.split(".")[-1],
85-
)
86-
for obj in objects
87-
if not obj.is_dir
88-
]
88+
async with self.minio.resource("s3", endpoint_url=self.endpoint_host) as s3:
89+
bucket = await s3.Bucket(self.bucket)
90+
objects = bucket.objects.all()
91+
return [
92+
ResultFile(
93+
filename=obj.key,
94+
size=await obj.size,
95+
extension=obj.key.split(".")[-1],
96+
)
97+
async for obj in objects
98+
if not obj.key.endswith("/")
99+
]
89100

90101
@retry(
91102
stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True
@@ -103,24 +114,31 @@ async def download_file(
103114
)
104115
)
105116

106-
async with aiohttp.ClientSession(
107-
timeout=aiohttp.ClientTimeout(total=10 * 60)
108-
) as session:
109-
_ = await self.minio.fget_object(
110-
bucket_name=self.bucket,
111-
object_name=object_name,
112-
file_path=path.as_posix(),
113-
session=session,
114-
)
117+
async with _semaphore:
118+
async with self.minio.resource("s3", endpoint_url=self.endpoint_host) as s3:
119+
obj = await s3.Object(self.bucket, object_name)
120+
remotesize = await obj.content_length
121+
if path.exists():
122+
# if file size is the same, let's not download anything
123+
# maybe move to a better verification mechanism with e-tags in the future
124+
localsize = path.stat().st_size
125+
if localsize == remotesize:
126+
return path.resolve()
127+
await obj.download_file(path.as_posix())
128+
localsize = path.stat().st_size
129+
if localsize != remotesize:
130+
raise RuntimeError(f"Download of {object_name} failed")
115131
return path.resolve()
116132

117133
@retry(
118134
stop=stop_after_attempt(3), wait=wait_random_exponential(max=60), reraise=True
119135
)
120136
async def get_signed_url(self, object_name: str) -> str:
121-
return await self.minio.get_presigned_url(
122-
bucket_name=self.bucket, object_name=object_name, method="GET"
123-
)
137+
async with _semaphore:
138+
async with self.minio.client("s3", endpoint_url=self.endpoint_host) as s3:
139+
return await s3.generate_presigned_url(
140+
"get_object", Params={"Bucket": self.bucket, "Key": object_name}
141+
)
124142

125143
@classmethod
126144
def hash_path(cls, file_name):

servicex/query_core.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2024, IRIS-HEP
1+
# Copyright (c) 2024-2025, IRIS-HEP
22
# All rights reserved.
33
#
44
# Redistribution and use in source and binary forms, with or without
@@ -204,14 +204,14 @@ def transform_complete(task: Task):
204204
f'ServiceX Exception for request ID {self.request_id} ({self.title})"',
205205
exc_info=task.exception(),
206206
)
207-
self.cache.delete_record_by_request_id(self.request_id)
208-
if download_files_task:
209-
download_files_task.cancel("Transform failed")
207+
if self.fail_if_incomplete:
208+
self.cache.delete_record_by_request_id(self.request_id)
209+
if download_files_task:
210+
download_files_task.cancel("Transform failed")
210211
raise task.exception()
211212

212213
if self.current_status.status in DONE_STATUS:
213214
if self.current_status.files_failed:
214-
self.cache.delete_record_by_request_id(self.request_id)
215215
titlestr = (
216216
f'"{self.current_status.title}" '
217217
if self.current_status.title is not None
@@ -220,7 +220,8 @@ def transform_complete(task: Task):
220220
errorstr = (
221221
f"Transform {titlestr}completed with failures: "
222222
f"{self.current_status.files_failed}/"
223-
f"{self.current_status.files} files failed. Will not cache."
223+
f"{self.current_status.files} files failed."
224+
f"{'Will not cache.' if self.fail_if_incomplete else ''}"
224225
)
225226
failedfiles = (
226227
self.servicex.url
@@ -231,6 +232,7 @@ def transform_complete(task: Task):
231232
"A list of failed files is at [bold red on white]"
232233
f"[link={failedfiles}]this link[/link][/bold red on white]"
233234
)
235+
logger.error(errorstr)
234236
logger.error(errorstr2)
235237
logger.error(
236238
f"Transform Request id: {self.current_status.request_id}"
@@ -246,7 +248,10 @@ def transform_complete(task: Task):
246248
f"More information of '{self.title}' [bold red on white][link={kibana_link}]HERE[/link][/bold red on white]" # NOQA: E501
247249
)
248250
if self.fail_if_incomplete:
251+
self.cache.delete_record_by_request_id(self.request_id)
249252
raise ServiceXException(errorstr)
253+
else:
254+
logger.error("Will continue to download what is available")
250255
else:
251256
logger.info("Transforms completed successfully")
252257
else: # pragma: no cover

servicex/servicex_client.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2022, IRIS-HEP
1+
# Copyright (c) 2022-2025, IRIS-HEP
22
# All rights reserved.
33
#
44
# Redistribution and use in source and binary forms, with or without
@@ -228,6 +228,7 @@ def deliver(
228228
fail_if_incomplete: bool = True,
229229
ignore_local_cache: bool = False,
230230
progress_bar: ProgressBarFormat = ProgressBarFormat.default,
231+
concurrency: int = 8,
231232
):
232233
r"""
233234
Execute a ServiceX query.
@@ -249,9 +250,13 @@ def deliver(
249250
will have its own progress bars; :py:const:`ProgressBarFormat.compact` gives one
250251
summary progress bar for all transformations; :py:const:`ProgressBarFormat.none`
251252
switches off progress bars completely.
253+
:param concurrency: specify how many downloads to run in parallel (default is 8).
252254
:return: A dictionary mapping the name of each :py:class:`Sample` to a :py:class:`.GuardList`
253255
with the file names or URLs for the outputs.
254256
"""
257+
from .minio_adapter import init_download_semaphore
258+
259+
init_download_semaphore(concurrency)
255260
config = _load_ServiceXSpec(spec)
256261

257262
if ignore_local_cache or config.General.IgnoreLocalCache:

tests/conftest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,3 +222,9 @@ def codegen_list():
222222
"uproot": "http://servicex-code-gen-uproot:8000",
223223
"uproot-raw": "http://servicex-code-gen-uproot-raw:8000",
224224
}
225+
226+
227+
# This exists to force an async event loop to be created
228+
@fixture
229+
async def with_event_loop():
230+
pass

tests/test_databinder.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ def test_invalid_dataset_identifier():
286286
)
287287

288288

289-
def test_submit_mapping(transformed_result, codegen_list):
289+
def test_submit_mapping(transformed_result, codegen_list, with_event_loop):
290290
from servicex import deliver
291291

292292
spec = {
@@ -316,7 +316,9 @@ def test_submit_mapping(transformed_result, codegen_list):
316316
assert list(results["sampleA"]) == ["1.parquet"]
317317

318318

319-
def test_submit_mapping_signed_urls(transformed_result_signed_url, codegen_list):
319+
def test_submit_mapping_signed_urls(
320+
transformed_result_signed_url, codegen_list, with_event_loop
321+
):
320322
from servicex import deliver
321323

322324
spec = {
@@ -347,7 +349,7 @@ def test_submit_mapping_signed_urls(transformed_result_signed_url, codegen_list)
347349
]
348350

349351

350-
def test_submit_mapping_failure(transformed_result, codegen_list):
352+
def test_submit_mapping_failure(transformed_result, codegen_list, with_event_loop):
351353
from servicex import deliver
352354

353355
spec = {
@@ -378,7 +380,7 @@ def test_submit_mapping_failure(transformed_result, codegen_list):
378380
pass
379381

380382

381-
def test_submit_mapping_failure_signed_urls(codegen_list):
383+
def test_submit_mapping_failure_signed_urls(codegen_list, with_event_loop):
382384
from servicex import deliver
383385

384386
spec = {
@@ -634,7 +636,7 @@ def run_query(input_filenames=None):
634636
_load_ServiceXSpec(path2)
635637

636638

637-
def test_funcadl_query(transformed_result, codegen_list):
639+
def test_funcadl_query(transformed_result, codegen_list, with_event_loop):
638640
from servicex import deliver
639641
from servicex.query import FuncADL_Uproot # type: ignore
640642

@@ -664,7 +666,7 @@ def test_funcadl_query(transformed_result, codegen_list):
664666
deliver(spec, config_path="tests/example_config.yaml")
665667

666668

667-
def test_query_with_codegen_override(transformed_result, codegen_list):
669+
def test_query_with_codegen_override(transformed_result, codegen_list, with_event_loop):
668670
from servicex import deliver
669671
from servicex.query import FuncADL_Uproot # type: ignore
670672

@@ -748,7 +750,7 @@ def test_databinder_load_dict():
748750
)
749751

750752

751-
def test_python_query(transformed_result, codegen_list):
753+
def test_python_query(transformed_result, codegen_list, with_event_loop):
752754
from servicex import deliver
753755
from servicex.query import PythonFunction # type: ignore
754756

@@ -782,7 +784,7 @@ def run_query(input_filenames=None):
782784
deliver(spec, config_path="tests/example_config.yaml")
783785

784786

785-
def test_uproot_raw_query(transformed_result, codegen_list):
787+
def test_uproot_raw_query(transformed_result, codegen_list, with_event_loop):
786788
from servicex import deliver
787789
from servicex.query import UprootRaw # type: ignore
788790

@@ -810,7 +812,7 @@ def test_uproot_raw_query(transformed_result, codegen_list):
810812
deliver(spec, config_path="tests/example_config.yaml")
811813

812814

813-
def test_uproot_raw_query_parquet(transformed_result, codegen_list):
815+
def test_uproot_raw_query_parquet(transformed_result, codegen_list, with_event_loop):
814816
from servicex import deliver
815817
from servicex.query import UprootRaw # type: ignore
816818

@@ -897,7 +899,7 @@ def test_generic_query(codegen_list):
897899
)
898900

899901

900-
def test_deliver_progress_options(transformed_result, codegen_list):
902+
def test_deliver_progress_options(transformed_result, codegen_list, with_event_loop):
901903
from servicex import deliver, ProgressBarFormat
902904
from servicex.query import UprootRaw # type: ignore
903905

0 commit comments

Comments
 (0)