Skip to content

Commit f7e5543

Browse files
committed
Use shared async client to limit simultaneous connections
1 parent 403c2df commit f7e5543

File tree

5 files changed

+27
-25
lines changed

5 files changed

+27
-25
lines changed

examples/bigger_uproot.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
spec = ServiceXSpec(
3636
General=General(
37-
ServiceX="testing1",
37+
ServiceX="servicex-uc-af",
3838
Codegen="uproot",
3939
OutputFormat="parquet",
4040
Delivery="LocalCache"

examples/single_file_uproot.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,15 @@
4343
"""
4444
query_ast = ast.parse(qstr)
4545
qastle_query = qastle.python_ast_to_text_ast(qastle.insert_linq_nodes(query_ast))
46-
print("From str", qastle_query)
4746
q2 = FuncADLQuery()
4847
q2.set_provided_qastle(qastle_query)
4948
print(q2.generate_selection_string())
50-
print("From python", query.generate_selection_string())
5149
spec = ServiceXSpec(
5250
General=General(
53-
ServiceX="testing1",
51+
ServiceX="servicex-uc-af",
5452
Codegen="uproot",
5553
OutputFormat="parquet",
56-
Delivery="LocalCache"
54+
Delivery="SignedURLs"
5755
),
5856
Sample=[
5957
Sample(

servicex/query.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -407,9 +407,20 @@ async def get_signed_url(
407407
if progress:
408408
progress.advance(task_id=download_progress, task_type="Download")
409409

410-
while True:
410+
transform_is_complete = False
411+
while not transform_is_complete:
411412
if not cached_record:
412413
await asyncio.sleep(self.minio_polling_interval)
414+
415+
# Once the transform is complete we can stop polling since all the files
416+
# are guaranteed to be in the bucket. Also, if we are just downloading or
417+
# signing urls for a previous transform then we know it is complete as well
418+
if cached_record or (
419+
self.current_status and self.current_status.status == Status.complete
420+
):
421+
transform_is_complete = True
422+
await asyncio.sleep(10)
423+
413424
if self.minio:
414425
async with download_semaphore:
415426
files = await self.minio.list_bucket()
@@ -443,13 +454,6 @@ async def get_signed_url(
443454
) # NOQA 501
444455
files_seen.add(file.filename)
445456

446-
# Once the transform is complete we can stop polling since all the files
447-
# are guaranteed to be in the bucket. Also, if we are just downloading or
448-
# signing urls for a previous transform then we know it is complete as well
449-
if cached_record or (
450-
self.current_status and self.current_status.status == Status.complete
451-
):
452-
break
453457

454458
# Now just wait until all of our tasks complete
455459
await asyncio.gather(*download_tasks)

servicex/servicex_client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ def get_codegen(_sample: Sample, _general: General):
5858
return _general.Codegen
5959

6060

61-
async with httpx.AsyncClient(limits=httpx.Limits(max_connections=4)) as client:
62-
download_semaphore = asyncio.Semaphore(10)
61+
async with httpx.AsyncClient(limits=httpx.Limits(max_connections=4), timeout=None) as client:
62+
download_semaphore = asyncio.Semaphore(20)
6363
servicex_semaphore = asyncio.Semaphore(4)
6464

6565
sx = ServiceXClient(backend=config.General.ServiceX,
@@ -111,11 +111,11 @@ def get_codegen(_sample: Sample, _general: General):
111111
group = DatasetGroup(datasets, download_semaphore=download_semaphore)
112112

113113
if config.General.Delivery == General.DeliveryEnum.LocalCache:
114-
results = group.as_files()
115-
return {obj.title: obj.signed_url_list for obj in results}
116-
elif config.General.Delivery == General.DeliveryEnum.SignedURLs:
117-
results = group.as_signed_urls()
114+
results = await group.as_files_async()
118115
return {obj.title: obj.file_list for obj in results}
116+
elif config.General.Delivery == General.DeliveryEnum.SignedURLs:
117+
results = await group.as_signed_urls_async()
118+
return {obj.title: obj.signed_url_list for obj in results}
119119

120120
deliver = make_sync(deliver_async)
121121

tests/test_servicex_adapter.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def test_result_formats():
5757
@patch('servicex.servicex_adapter.httpx.AsyncClient.get')
5858
async def test_get_transforms(get, servicex, transform_status_response):
5959
get.return_value = httpx.Response(200, json=transform_status_response)
60-
t = await servicex.get_transforms()
60+
t = await servicex.get_transforms(self.servicex_semaphore)
6161
assert len(t) == 1
6262
assert t[0].request_id == "b8c508d0-ccf2-4deb-a1f7-65c839eebabf"
6363
get.assert_called_with(url='https://servicex.org/servicex/transformation', headers={})
@@ -68,7 +68,7 @@ async def test_get_transforms(get, servicex, transform_status_response):
6868
async def test_get_transforms_auth_error(get, servicex):
6969
with pytest.raises(AuthorizationError):
7070
get.return_value = httpx.Response(401)
71-
await servicex.get_transforms()
71+
await servicex.get_transforms(self.servicex_semaphore)
7272

7373

7474
@pytest.mark.asyncio
@@ -85,12 +85,12 @@ async def test_get_transforms_wlcg_bearer_token(decode, post, get, servicex,
8585

8686
get.return_value = httpx.Response(200, json=transform_status_response)
8787
decode.return_value = {'exp': math.inf}
88-
await servicex.get_transforms()
88+
await servicex.get_transforms(self.servicex_semaphore)
8989

9090
# Try with an expired token
9191
with pytest.raises(AuthorizationError):
9292
decode.return_value = {'exp': 0.0}
93-
await servicex.get_transforms()
93+
await servicex.get_transforms(self.servicex_semaphore)
9494

9595
os.remove(token_file.name)
9696
del os.environ['BEARER_TOKEN_FILE']
@@ -103,7 +103,7 @@ async def test_get_transforms_with_refresh(get, post, transform_status_response)
103103
servicex = ServiceXAdapter(url="https://servicex.org", refresh_token="refrescas")
104104
post.return_value = httpx.Response(200, json={"access_token": "luckycharms"})
105105
get.return_value = httpx.Response(200, json=transform_status_response)
106-
await servicex.get_transforms()
106+
await servicex.get_transforms(self.servicex_semaphore)
107107

108108
post.assert_called_with('https://servicex.org/token/refresh',
109109
headers={'Authorization': 'Bearer refrescas'}, json=None)
@@ -118,7 +118,7 @@ def test_get_codegens(get, servicex):
118118
"uproot": "http://uproot-codegen",
119119
"xaod": "http://xaod-codegen"
120120
})
121-
c = servicex.get_code_generators()
121+
c = servicex.get_code_generators(self.servicex_semaphore)
122122
assert len(c) == 2
123123
assert c["uproot"] == "http://uproot-codegen"
124124

0 commit comments

Comments
 (0)