Skip to content

Commit 8f95a15

Browse files
authored
Change from aiohttp to httpx (#598)
1 parent c0496dd commit 8f95a15

File tree

3 files changed

+251
-258
lines changed

3 files changed

+251
-258
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ dependencies = [
3636
"func_adl>=3.2.6",
3737
"requests>=2.31",
3838
"pydantic>=2.6.0",
39-
"aiohttp-retry>=2.8.3",
4039
"httpx>=0.24",
40+
"httpx_retries>=0.3.2",
4141
"aioboto3>=14.1.0",
4242
"tinydb>=4.7",
4343
"google-auth>=2.17",

servicex/servicex_adapter.py

Lines changed: 136 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
import time
3030
from typing import Optional, Dict, List
3131

32-
from aiohttp import ClientSession
3332
import httpx
34-
from aiohttp_retry import RetryClient, ExponentialRetry, ClientResponse
35-
from aiohttp import ContentTypeError
33+
from httpx import AsyncClient, Response
34+
from json import JSONDecodeError
35+
from httpx_retries import RetryTransport, Retry
3636
from google.auth import jwt
3737
from tenacity import (
3838
AsyncRetrying,
@@ -48,12 +48,12 @@ class AuthorizationError(BaseException):
4848
pass
4949

5050

51-
async def _extract_message(r: ClientResponse):
51+
async def _extract_message(r: Response):
5252
try:
53-
o = await r.json()
53+
o = r.json()
5454
error_message = o.get("message", str(r))
55-
except ContentTypeError:
56-
error_message = await r.text()
55+
except JSONDecodeError:
56+
error_message = r.text
5757
return error_message
5858

5959

@@ -66,15 +66,15 @@ def __init__(self, url: str, refresh_token: Optional[str] = None):
6666
async def _get_token(self):
6767
url = f"{self.url}/token/refresh"
6868
headers = {"Authorization": f"Bearer {self.refresh_token}"}
69-
async with RetryClient() as client:
70-
async with client.post(url, headers=headers, json=None) as r:
71-
if r.status == 200:
72-
o = await r.json()
73-
self.token = o["access_token"]
74-
else:
75-
raise AuthorizationError(
76-
f"ServiceX access token request rejected [{r.status} {r.reason}]"
77-
)
69+
async with AsyncClient() as client:
70+
r = await client.post(url, headers=headers, json=None)
71+
if r.status_code == 200:
72+
o = r.json()
73+
self.token = o["access_token"]
74+
else:
75+
raise AuthorizationError(
76+
f"ServiceX access token request rejected [{r.status_code} {r.reason_phrase}]"
77+
)
7878

7979
@staticmethod
8080
def _get_bearer_token_file():
@@ -122,23 +122,23 @@ async def _get_authorization(self, force_reauth: bool = False) -> Dict[str, str]
122122

123123
async def get_transforms(self) -> List[TransformStatus]:
124124
headers = await self._get_authorization()
125-
retry_options = ExponentialRetry(attempts=3, start_timeout=10)
126-
async with RetryClient(retry_options=retry_options) as client:
127-
async with client.get(
125+
retry_options = Retry(total=3, backoff_factor=10)
126+
async with AsyncClient(transport=RetryTransport(retry=retry_options)) as client:
127+
r = await client.get(
128128
url=f"{self.url}/servicex/transformation", headers=headers
129-
) as r:
130-
if r.status == 401:
131-
raise AuthorizationError(
132-
f"Not authorized to access serviceX at {self.url}"
133-
)
134-
elif r.status > 400:
135-
error_message = await _extract_message(r)
136-
raise RuntimeError(
137-
"ServiceX WebAPI Error during transformation "
138-
f"submission: {r.status} - {error_message}"
139-
)
140-
o = await r.json()
141-
statuses = [TransformStatus(**status) for status in o["requests"]]
129+
)
130+
if r.status_code == 401:
131+
raise AuthorizationError(
132+
f"Not authorized to access serviceX at {self.url}"
133+
)
134+
elif r.status_code > 400:
135+
error_message = await _extract_message(r)
136+
raise RuntimeError(
137+
"ServiceX WebAPI Error during transformation "
138+
f"status retrieval: {r.status_code} - {error_message}"
139+
)
140+
o = r.json()
141+
statuses = [TransformStatus(**status) for status in o["requests"]]
142142
return statuses
143143

144144
def get_code_generators(self):
@@ -159,20 +159,19 @@ async def get_datasets(
159159
if show_deleted:
160160
params["show-deleted"] = True
161161

162-
async with ClientSession() as session:
163-
async with session.get(
162+
async with AsyncClient() as session:
163+
r = await session.get(
164164
headers=headers, url=f"{self.url}/servicex/datasets", params=params
165-
) as r:
166-
167-
if r.status == 403:
168-
raise AuthorizationError(
169-
f"Not authorized to access serviceX at {self.url}"
170-
)
171-
elif r.status != 200:
172-
msg = await _extract_message(r)
173-
raise RuntimeError(f"Failed to get datasets: {r.status} - {msg}")
165+
)
166+
if r.status_code == 403:
167+
raise AuthorizationError(
168+
f"Not authorized to access serviceX at {self.url}"
169+
)
170+
elif r.status_code != 200:
171+
msg = await _extract_message(r)
172+
raise RuntimeError(f"Failed to get datasets: {r.status_code} - {msg}")
174173

175-
result = await r.json()
174+
result = r.json()
176175

177176
datasets = [CachedDataset(**d) for d in result["datasets"]]
178177
return datasets
@@ -181,113 +180,105 @@ async def get_dataset(self, dataset_id=None) -> CachedDataset:
181180
headers = await self._get_authorization()
182181
path_template = "/servicex/datasets/{dataset_id}"
183182
url = self.url + path_template.format(dataset_id=dataset_id)
184-
async with ClientSession() as session:
185-
async with session.get(headers=headers, url=url) as r:
186-
187-
if r.status == 403:
188-
raise AuthorizationError(
189-
f"Not authorized to access serviceX at {self.url}"
190-
)
191-
elif r.status == 404:
192-
raise ValueError(f"Dataset {dataset_id} not found")
193-
elif r.status != 200:
194-
msg = await _extract_message(r)
195-
raise RuntimeError(f"Failed to get dataset {dataset_id} - {msg}")
196-
result = await r.json()
183+
async with AsyncClient() as session:
184+
r = await session.get(headers=headers, url=url)
185+
if r.status_code == 403:
186+
raise AuthorizationError(
187+
f"Not authorized to access serviceX at {self.url}"
188+
)
189+
elif r.status_code == 404:
190+
raise ValueError(f"Dataset {dataset_id} not found")
191+
elif r.status_code != 200:
192+
msg = await _extract_message(r)
193+
raise RuntimeError(f"Failed to get dataset {dataset_id} - {msg}")
194+
result = r.json()
197195

198-
dataset = CachedDataset(**result)
199-
return dataset
196+
dataset = CachedDataset(**result)
197+
return dataset
200198

201199
async def delete_dataset(self, dataset_id=None) -> bool:
202200
headers = await self._get_authorization()
203201
path_template = "/servicex/datasets/{dataset_id}"
204202
url = self.url + path_template.format(dataset_id=dataset_id)
205203

206-
async with ClientSession() as session:
207-
async with session.delete(headers=headers, url=url) as r:
208-
209-
if r.status == 403:
210-
raise AuthorizationError(
211-
f"Not authorized to access serviceX at {self.url}"
212-
)
213-
elif r.status == 404:
214-
raise ValueError(f"Dataset {dataset_id} not found")
215-
elif r.status != 200:
216-
msg = await _extract_message(r)
217-
raise RuntimeError(f"Failed to delete dataset {dataset_id} - {msg}")
218-
result = await r.json()
219-
return result["stale"]
204+
async with AsyncClient() as session:
205+
r = await session.delete(headers=headers, url=url)
206+
if r.status_code == 403:
207+
raise AuthorizationError(
208+
f"Not authorized to access serviceX at {self.url}"
209+
)
210+
elif r.status_code == 404:
211+
raise ValueError(f"Dataset {dataset_id} not found")
212+
elif r.status_code != 200:
213+
msg = await _extract_message(r)
214+
raise RuntimeError(f"Failed to delete dataset {dataset_id} - {msg}")
215+
result = r.json()
216+
return result["stale"]
220217

221218
async def delete_transform(self, transform_id=None):
222219
headers = await self._get_authorization()
223220
path_template = f"/servicex/transformation/{transform_id}"
224221
url = self.url + path_template.format(transform_id=transform_id)
225222

226-
async with ClientSession() as session:
227-
async with session.delete(headers=headers, url=url) as r:
228-
229-
if r.status == 403:
230-
raise AuthorizationError(
231-
f"Not authorized to access serviceX at {self.url}"
232-
)
233-
elif r.status == 404:
234-
raise ValueError(f"Transform {transform_id} not found")
235-
elif r.status != 200:
236-
msg = await _extract_message(r)
237-
raise RuntimeError(
238-
f"Failed to delete transform {transform_id} - {msg}"
239-
)
223+
async with AsyncClient() as session:
224+
r = await session.delete(headers=headers, url=url)
225+
if r.status_code == 403:
226+
raise AuthorizationError(
227+
f"Not authorized to access serviceX at {self.url}"
228+
)
229+
elif r.status_code == 404:
230+
raise ValueError(f"Transform {transform_id} not found")
231+
elif r.status_code != 200:
232+
msg = await _extract_message(r)
233+
raise RuntimeError(f"Failed to delete transform {transform_id} - {msg}")
240234

241235
async def cancel_transform(self, transform_id=None):
242236
headers = await self._get_authorization()
243237
path_template = f"/servicex/transformation/{transform_id}/cancel"
244238
url = self.url + path_template.format(transform_id=transform_id)
245239

246-
async with ClientSession() as session:
247-
async with session.get(headers=headers, url=url) as r:
248-
249-
if r.status == 403:
250-
raise AuthorizationError(
251-
f"Not authorized to access serviceX at {self.url}"
252-
)
253-
elif r.status == 404:
254-
raise ValueError(f"Transform {transform_id} not found")
255-
elif r.status != 200:
256-
msg = await _extract_message(r)
257-
raise RuntimeError(
258-
f"Failed to cancel transform {transform_id} - {msg}"
259-
)
240+
async with AsyncClient() as session:
241+
r = await session.get(headers=headers, url=url)
242+
if r.status_code == 403:
243+
raise AuthorizationError(
244+
f"Not authorized to access serviceX at {self.url}"
245+
)
246+
elif r.status_code == 404:
247+
raise ValueError(f"Transform {transform_id} not found")
248+
elif r.status_code != 200:
249+
msg = await _extract_message(r)
250+
raise RuntimeError(f"Failed to cancel transform {transform_id} - {msg}")
260251

261252
async def submit_transform(self, transform_request: TransformRequest) -> str:
262253
headers = await self._get_authorization()
263-
retry_options = ExponentialRetry(attempts=3, start_timeout=30)
264-
async with RetryClient(retry_options=retry_options) as client:
265-
async with client.post(
254+
retry_options = Retry(total=3, backoff_factor=30)
255+
async with AsyncClient(transport=RetryTransport(retry=retry_options)) as client:
256+
r = await client.post(
266257
url=f"{self.url}/servicex/transformation",
267258
headers=headers,
268259
json=transform_request.model_dump(by_alias=True, exclude_none=True),
269-
) as r:
270-
if r.status == 401:
271-
raise AuthorizationError(
272-
f"Not authorized to access serviceX at {self.url}"
273-
)
274-
elif r.status == 400:
275-
message = await _extract_message(r)
276-
raise ValueError(f"Invalid transform request: {message}")
277-
elif r.status > 400:
278-
error_message = await _extract_message(r)
279-
raise RuntimeError(
280-
"ServiceX WebAPI Error during transformation "
281-
f"submission: {r.status} - {error_message}"
282-
)
283-
else:
284-
o = await r.json()
285-
return o["request_id"]
260+
)
261+
if r.status_code == 401:
262+
raise AuthorizationError(
263+
f"Not authorized to access serviceX at {self.url}"
264+
)
265+
elif r.status_code == 400:
266+
message = await _extract_message(r)
267+
raise ValueError(f"Invalid transform request: {message}")
268+
elif r.status_code > 400:
269+
error_message = await _extract_message(r)
270+
raise RuntimeError(
271+
"ServiceX WebAPI Error during transformation "
272+
f"submission: {r.status_code} - {error_message}"
273+
)
274+
else:
275+
o = r.json()
276+
return o["request_id"]
286277

287278
async def get_transform_status(self, request_id: str) -> TransformStatus:
288279
headers = await self._get_authorization()
289-
retry_options = ExponentialRetry(attempts=5, start_timeout=3)
290-
async with RetryClient(retry_options=retry_options) as client:
280+
retry_options = Retry(total=5, backoff_factor=3)
281+
async with AsyncClient(transport=RetryTransport(retry=retry_options)) as client:
291282
try:
292283
async for attempt in AsyncRetrying(
293284
retry=retry_if_not_exception_type(ValueError),
@@ -296,28 +287,31 @@ async def get_transform_status(self, request_id: str) -> TransformStatus:
296287
reraise=True,
297288
):
298289
with attempt:
299-
async with client.get(
290+
r = await client.get(
300291
url=f"{self.url}/servicex/" f"transformation/{request_id}",
301292
headers=headers,
302-
) as r:
303-
if r.status == 401:
304-
# perhaps we just ran out of auth validity the last time?
305-
# refetch auth then raise an error for retry
306-
headers = await self._get_authorization(True)
307-
raise AuthorizationError(
308-
f"Not authorized to access serviceX at {self.url}"
309-
)
310-
if r.status == 404:
311-
raise ValueError(f"Transform ID {request_id} not found")
312-
elif r.status > 400:
313-
error_message = await _extract_message(r)
314-
raise RuntimeError(
315-
"ServiceX WebAPI Error during transformation: "
316-
f"{r.status} - {error_message}"
317-
)
318-
o = await r.json()
319-
return TransformStatus(**o)
293+
)
294+
if r.status_code == 401:
295+
# perhaps we just ran out of auth validity the last time?
296+
# refetch auth then raise an error for retry
297+
headers = await self._get_authorization(True)
298+
raise AuthorizationError(
299+
f"Not authorized to access serviceX at {self.url}"
300+
)
301+
if r.status_code == 404:
302+
raise ValueError(f"Transform ID {request_id} not found")
303+
elif r.status_code > 400:
304+
error_message = await _extract_message(r)
305+
raise RuntimeError(
306+
"ServiceX WebAPI Error during transformation: "
307+
f"{r.status_code} - {error_message}"
308+
)
309+
o = r.json()
310+
return TransformStatus(**o)
320311
except RuntimeError as e:
321312
raise RuntimeError(
322313
"ServiceX WebAPI Error " f"while getting transform status: {e}"
323314
)
315+
raise RuntimeError(
316+
"ServiceX WebAPI: unable to retrieve transform status"
317+
) # pragma: no cover

0 commit comments

Comments
 (0)