Skip to content

Commit e971afd

Browse files
committed
Add back concurrency limits
1 parent 37b0da7 commit e971afd

File tree

1 file changed

+8
-5
lines changed

1 file changed

+8
-5
lines changed

servicex/minio_adapter.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,18 @@
3838

3939
from servicex.models import ResultFile, TransformStatus
4040

41+
# Maximum five simultaneous streams per individual file download
4142
_transferconfig = TransferConfig(max_concurrency=5)
42-
_sem = asyncio.Semaphore(10)
43+
# Maximum ten files simultaneously being downloaded (configurable with init_s3_config)
44+
_file_transfer_sem = asyncio.Semaphore(10)
45+
# Maximum five buckets being queried at once
4346
_bucket_list_sem = asyncio.Semaphore(5)
4447

4548

4649
def init_s3_config(concurrency: int = 10):
4750
"Update the number of concurrent connections"
48-
global _sem
49-
_sem = asyncio.Semaphore(concurrency)
51+
global _file_transfer_sem
52+
_file_transfer_sem = asyncio.Semaphore(concurrency)
5053

5154

5255
def _sanitize_filename(fname: str):
@@ -128,7 +131,7 @@ async def download_file(
128131
if expected_size is not None:
129132
remotesize = expected_size
130133
else:
131-
async with _sem:
134+
async with _file_transfer_sem:
132135
info = await s3.head_object(Bucket=self.bucket, Key=object_name)
133136
remotesize = info["ContentLength"]
134137
if path.exists():
@@ -137,7 +140,7 @@ async def download_file(
137140
localsize = path.stat().st_size
138141
if localsize == remotesize:
139142
return path.resolve()
140-
async with _sem:
143+
async with _file_transfer_sem:
141144
await s3.download_file(
142145
Bucket=self.bucket,
143146
Key=object_name,

0 commit comments

Comments
 (0)