Skip to content

Commit 5d02216

Browse files
committed
Change from aiohttp to httpx
1 parent 2e24a67 commit 5d02216

File tree

3 files changed

+210
-236
lines changed

3 files changed

+210
-236
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ dependencies = [
3636
"func_adl>=3.2.6",
3737
"requests>=2.31",
3838
"pydantic>=2.6.0",
39-
"aiohttp-retry>=2.8.3",
4039
"httpx>=0.24",
40+
"httpx_retries>=0.3.2",
4141
"aioboto3>=14.1.0",
4242
"tinydb>=4.7",
4343
"google-auth>=2.17",

servicex/servicex_adapter.py

Lines changed: 134 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
import time
3030
from typing import Optional, Dict, List
3131

32-
from aiohttp import ClientSession
3332
import httpx
34-
from aiohttp_retry import RetryClient, ExponentialRetry, ClientResponse
35-
from aiohttp import ContentTypeError
33+
from httpx import AsyncClient as ClientSession, Response
34+
from json import JSONDecodeError
35+
from httpx_retries import RetryTransport, Retry
3636
from google.auth import jwt
3737
from tenacity import (
3838
AsyncRetrying,
@@ -48,12 +48,12 @@ class AuthorizationError(BaseException):
4848
pass
4949

5050

51-
async def _extract_message(r: ClientResponse):
51+
async def _extract_message(r: Response):
5252
try:
5353
o = await r.json()
5454
error_message = o.get("message", str(r))
55-
except ContentTypeError:
56-
error_message = await r.text()
55+
except JSONDecodeError:
56+
error_message = r.text
5757
return error_message
5858

5959

@@ -66,15 +66,15 @@ def __init__(self, url: str, refresh_token: Optional[str] = None):
6666
async def _get_token(self):
6767
url = f"{self.url}/token/refresh"
6868
headers = {"Authorization": f"Bearer {self.refresh_token}"}
69-
async with RetryClient() as client:
70-
async with client.post(url, headers=headers, json=None) as r:
71-
if r.status == 200:
72-
o = await r.json()
73-
self.token = o["access_token"]
74-
else:
75-
raise AuthorizationError(
76-
f"ServiceX access token request rejected [{r.status} {r.reason}]"
77-
)
69+
async with ClientSession() as client:
70+
r = await client.post(url, headers=headers, json=None)
71+
if r.status_code == 200:
72+
o = await r.json()
73+
self.token = o["access_token"]
74+
else:
75+
raise AuthorizationError(
76+
f"ServiceX access token request rejected [{r.status_code} {r.reason_phrase}]"
77+
)
7878

7979
@staticmethod
8080
def _get_bearer_token_file():
@@ -112,23 +112,25 @@ async def _get_authorization(self, force_reauth: bool = False) -> Dict[str, str]
112112

113113
async def get_transforms(self) -> List[TransformStatus]:
114114
headers = await self._get_authorization()
115-
retry_options = ExponentialRetry(attempts=3, start_timeout=10)
116-
async with RetryClient(retry_options=retry_options) as client:
117-
async with client.get(
115+
retry_options = Retry(total=3, backoff_factor=10)
116+
async with ClientSession(
117+
transport=RetryTransport(retry=retry_options)
118+
) as client:
119+
r = await client.get(
118120
url=f"{self.url}/servicex/transformation", headers=headers
119-
) as r:
120-
if r.status == 401:
121-
raise AuthorizationError(
122-
f"Not authorized to access serviceX at {self.url}"
123-
)
124-
elif r.status > 400:
125-
error_message = await _extract_message(r)
126-
raise RuntimeError(
127-
"ServiceX WebAPI Error during transformation "
128-
f"submission: {r.status} - {error_message}"
129-
)
130-
o = await r.json()
131-
statuses = [TransformStatus(**status) for status in o["requests"]]
121+
)
122+
if r.status_code == 401:
123+
raise AuthorizationError(
124+
f"Not authorized to access serviceX at {self.url}"
125+
)
126+
elif r.status_code > 400:
127+
error_message = await _extract_message(r)
128+
raise RuntimeError(
129+
"ServiceX WebAPI Error during transformation "
130+
f"submission: {r.status_code} - {error_message}"
131+
)
132+
o = await r.json()
133+
statuses = [TransformStatus(**status) for status in o["requests"]]
132134
return statuses
133135

134136
def get_code_generators(self):
@@ -150,19 +152,18 @@ async def get_datasets(
150152
params["show-deleted"] = True
151153

152154
async with ClientSession() as session:
153-
async with session.get(
155+
r = await session.get(
154156
headers=headers, url=f"{self.url}/servicex/datasets", params=params
155-
) as r:
156-
157-
if r.status == 403:
158-
raise AuthorizationError(
159-
f"Not authorized to access serviceX at {self.url}"
160-
)
161-
elif r.status != 200:
162-
msg = await _extract_message(r)
163-
raise RuntimeError(f"Failed to get datasets: {r.status} - {msg}")
157+
)
158+
if r.status_code == 403:
159+
raise AuthorizationError(
160+
f"Not authorized to access serviceX at {self.url}"
161+
)
162+
elif r.status_code != 200:
163+
msg = await _extract_message(r)
164+
raise RuntimeError(f"Failed to get datasets: {r.status_code} - {msg}")
164165

165-
result = await r.json()
166+
result = await r.json()
166167

167168
datasets = [CachedDataset(**d) for d in result["datasets"]]
168169
return datasets
@@ -172,112 +173,108 @@ async def get_dataset(self, dataset_id=None) -> CachedDataset:
172173
path_template = "/servicex/datasets/{dataset_id}"
173174
url = self.url + path_template.format(dataset_id=dataset_id)
174175
async with ClientSession() as session:
175-
async with session.get(headers=headers, url=url) as r:
176-
177-
if r.status == 403:
178-
raise AuthorizationError(
179-
f"Not authorized to access serviceX at {self.url}"
180-
)
181-
elif r.status == 404:
182-
raise ValueError(f"Dataset {dataset_id} not found")
183-
elif r.status != 200:
184-
msg = await _extract_message(r)
185-
raise RuntimeError(f"Failed to get dataset {dataset_id} - {msg}")
186-
result = await r.json()
176+
r = await session.get(headers=headers, url=url)
177+
if r.status_code == 403:
178+
raise AuthorizationError(
179+
f"Not authorized to access serviceX at {self.url}"
180+
)
181+
elif r.status_code == 404:
182+
raise ValueError(f"Dataset {dataset_id} not found")
183+
elif r.status_code != 200:
184+
msg = await _extract_message(r)
185+
raise RuntimeError(f"Failed to get dataset {dataset_id} - {msg}")
186+
result = await r.json()
187187

188-
dataset = CachedDataset(**result)
189-
return dataset
188+
dataset = CachedDataset(**result)
189+
return dataset
190190

191191
async def delete_dataset(self, dataset_id=None) -> bool:
192192
headers = await self._get_authorization()
193193
path_template = "/servicex/datasets/{dataset_id}"
194194
url = self.url + path_template.format(dataset_id=dataset_id)
195195

196196
async with ClientSession() as session:
197-
async with session.delete(headers=headers, url=url) as r:
198-
199-
if r.status == 403:
200-
raise AuthorizationError(
201-
f"Not authorized to access serviceX at {self.url}"
202-
)
203-
elif r.status == 404:
204-
raise ValueError(f"Dataset {dataset_id} not found")
205-
elif r.status != 200:
206-
msg = await _extract_message(r)
207-
raise RuntimeError(f"Failed to delete dataset {dataset_id} - {msg}")
208-
result = await r.json()
209-
return result["stale"]
197+
r = await session.delete(headers=headers, url=url)
198+
if r.status_code == 403:
199+
raise AuthorizationError(
200+
f"Not authorized to access serviceX at {self.url}"
201+
)
202+
elif r.status_code == 404:
203+
raise ValueError(f"Dataset {dataset_id} not found")
204+
elif r.status_code != 200:
205+
msg = await _extract_message(r)
206+
raise RuntimeError(f"Failed to delete dataset {dataset_id} - {msg}")
207+
result = await r.json()
208+
return result["stale"]
210209

211210
async def delete_transform(self, transform_id=None):
212211
headers = await self._get_authorization()
213212
path_template = f"/servicex/transformation/{transform_id}"
214213
url = self.url + path_template.format(transform_id=transform_id)
215214

216215
async with ClientSession() as session:
217-
async with session.delete(headers=headers, url=url) as r:
218-
219-
if r.status == 403:
220-
raise AuthorizationError(
221-
f"Not authorized to access serviceX at {self.url}"
222-
)
223-
elif r.status == 404:
224-
raise ValueError(f"Transform {transform_id} not found")
225-
elif r.status != 200:
226-
msg = await _extract_message(r)
227-
raise RuntimeError(
228-
f"Failed to delete transform {transform_id} - {msg}"
229-
)
216+
r = await session.delete(headers=headers, url=url)
217+
if r.status_code == 403:
218+
raise AuthorizationError(
219+
f"Not authorized to access serviceX at {self.url}"
220+
)
221+
elif r.status_code == 404:
222+
raise ValueError(f"Transform {transform_id} not found")
223+
elif r.status_code != 200:
224+
msg = await _extract_message(r)
225+
raise RuntimeError(f"Failed to delete transform {transform_id} - {msg}")
230226

231227
async def cancel_transform(self, transform_id=None):
232228
headers = await self._get_authorization()
233229
path_template = f"/servicex/transformation/{transform_id}/cancel"
234230
url = self.url + path_template.format(transform_id=transform_id)
235231

236232
async with ClientSession() as session:
237-
async with session.get(headers=headers, url=url) as r:
238-
239-
if r.status == 403:
240-
raise AuthorizationError(
241-
f"Not authorized to access serviceX at {self.url}"
242-
)
243-
elif r.status == 404:
244-
raise ValueError(f"Transform {transform_id} not found")
245-
elif r.status != 200:
246-
msg = await _extract_message(r)
247-
raise RuntimeError(
248-
f"Failed to cancel transform {transform_id} - {msg}"
249-
)
233+
r = await session.get(headers=headers, url=url)
234+
if r.status_code == 403:
235+
raise AuthorizationError(
236+
f"Not authorized to access serviceX at {self.url}"
237+
)
238+
elif r.status_code == 404:
239+
raise ValueError(f"Transform {transform_id} not found")
240+
elif r.status_code != 200:
241+
msg = await _extract_message(r)
242+
raise RuntimeError(f"Failed to cancel transform {transform_id} - {msg}")
250243

251244
async def submit_transform(self, transform_request: TransformRequest) -> str:
252245
headers = await self._get_authorization()
253-
retry_options = ExponentialRetry(attempts=3, start_timeout=30)
254-
async with RetryClient(retry_options=retry_options) as client:
255-
async with client.post(
246+
retry_options = Retry(total=3, backoff_factor=30)
247+
async with ClientSession(
248+
transport=RetryTransport(retry=retry_options)
249+
) as client:
250+
r = await client.post(
256251
url=f"{self.url}/servicex/transformation",
257252
headers=headers,
258253
json=transform_request.model_dump(by_alias=True, exclude_none=True),
259-
) as r:
260-
if r.status == 401:
261-
raise AuthorizationError(
262-
f"Not authorized to access serviceX at {self.url}"
263-
)
264-
elif r.status == 400:
265-
message = await _extract_message(r)
266-
raise ValueError(f"Invalid transform request: {message}")
267-
elif r.status > 400:
268-
error_message = await _extract_message(r)
269-
raise RuntimeError(
270-
"ServiceX WebAPI Error during transformation "
271-
f"submission: {r.status} - {error_message}"
272-
)
273-
else:
274-
o = await r.json()
275-
return o["request_id"]
254+
)
255+
if r.status_code == 401:
256+
raise AuthorizationError(
257+
f"Not authorized to access serviceX at {self.url}"
258+
)
259+
elif r.status_code == 400:
260+
message = await _extract_message(r)
261+
raise ValueError(f"Invalid transform request: {message}")
262+
elif r.status_code > 400:
263+
error_message = await _extract_message(r)
264+
raise RuntimeError(
265+
"ServiceX WebAPI Error during transformation "
266+
f"submission: {r.status_code} - {error_message}"
267+
)
268+
else:
269+
o = await r.json()
270+
return o["request_id"]
276271

277272
async def get_transform_status(self, request_id: str) -> TransformStatus:
278273
headers = await self._get_authorization()
279-
retry_options = ExponentialRetry(attempts=5, start_timeout=3)
280-
async with RetryClient(retry_options=retry_options) as client:
274+
retry_options = Retry(total=5, backoff_factor=3)
275+
async with ClientSession(
276+
transport=RetryTransport(retry=retry_options)
277+
) as client:
281278
try:
282279
async for attempt in AsyncRetrying(
283280
retry=retry_if_not_exception_type(ValueError),
@@ -286,28 +283,29 @@ async def get_transform_status(self, request_id: str) -> TransformStatus:
286283
reraise=True,
287284
):
288285
with attempt:
289-
async with client.get(
286+
r = await client.get(
290287
url=f"{self.url}/servicex/" f"transformation/{request_id}",
291288
headers=headers,
292-
) as r:
293-
if r.status == 401:
294-
# perhaps we just ran out of auth validity the last time?
295-
# refetch auth then raise an error for retry
296-
headers = await self._get_authorization(True)
297-
raise AuthorizationError(
298-
f"Not authorized to access serviceX at {self.url}"
299-
)
300-
if r.status == 404:
301-
raise ValueError(f"Transform ID {request_id} not found")
302-
elif r.status > 400:
303-
error_message = await _extract_message(r)
304-
raise RuntimeError(
305-
"ServiceX WebAPI Error during transformation: "
306-
f"{r.status} - {error_message}"
307-
)
308-
o = await r.json()
309-
return TransformStatus(**o)
289+
)
290+
if r.status_code == 401:
291+
# perhaps we just ran out of auth validity the last time?
292+
# refetch auth then raise an error for retry
293+
headers = await self._get_authorization(True)
294+
raise AuthorizationError(
295+
f"Not authorized to access serviceX at {self.url}"
296+
)
297+
if r.status_code == 404:
298+
raise ValueError(f"Transform ID {request_id} not found")
299+
elif r.status_code > 400:
300+
error_message = await _extract_message(r)
301+
raise RuntimeError(
302+
"ServiceX WebAPI Error during transformation: "
303+
f"{r.status_code} - {error_message}"
304+
)
305+
o = await r.json()
306+
return TransformStatus(**o)
310307
except RuntimeError as e:
311308
raise RuntimeError(
312309
"ServiceX WebAPI Error " f"while getting transform status: {e}"
313310
)
311+
raise RuntimeError("ServiceX WebAPI: unable to retrieve transform status")

0 commit comments

Comments
 (0)