Skip to content

Change from aiohttp to httpx #598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ dependencies = [
"func_adl>=3.2.6",
"requests>=2.31",
"pydantic>=2.6.0",
"aiohttp-retry>=2.8.3",
"httpx>=0.24",
"httpx_retries>=0.3.2",
"aioboto3>=14.1.0",
"tinydb>=4.7",
"google-auth>=2.17",
Expand Down
270 changes: 134 additions & 136 deletions servicex/servicex_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import time
from typing import Optional, Dict, List

from aiohttp import ClientSession
import httpx
from aiohttp_retry import RetryClient, ExponentialRetry, ClientResponse
from aiohttp import ContentTypeError
from httpx import AsyncClient as ClientSession, Response
Copy link
Preview

Copilot AI May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Aliasing httpx.AsyncClient as ClientSession can be confusing given its similarity to aiohttp; consider using a distinct alias like AsyncClient for clarity.

Suggested change
from httpx import AsyncClient as ClientSession, Response
from httpx import AsyncClient, Response

Copilot uses AI. Check for mistakes.

from json import JSONDecodeError
from httpx_retries import RetryTransport, Retry
from google.auth import jwt
from tenacity import (
AsyncRetrying,
Expand All @@ -48,12 +48,12 @@ class AuthorizationError(BaseException):
pass


async def _extract_message(r: ClientResponse):
async def _extract_message(r: Response):
try:
o = await r.json()
error_message = o.get("message", str(r))
except ContentTypeError:
error_message = await r.text()
except JSONDecodeError:
error_message = r.text
return error_message


Expand All @@ -66,15 +66,15 @@ def __init__(self, url: str, refresh_token: Optional[str] = None):
async def _get_token(self):
url = f"{self.url}/token/refresh"
headers = {"Authorization": f"Bearer {self.refresh_token}"}
async with RetryClient() as client:
async with client.post(url, headers=headers, json=None) as r:
if r.status == 200:
o = await r.json()
self.token = o["access_token"]
else:
raise AuthorizationError(
f"ServiceX access token request rejected [{r.status} {r.reason}]"
)
async with ClientSession() as client:
r = await client.post(url, headers=headers, json=None)
if r.status_code == 200:
o = await r.json()
self.token = o["access_token"]
else:
raise AuthorizationError(
f"ServiceX access token request rejected [{r.status_code} {r.reason_phrase}]"
)

@staticmethod
def _get_bearer_token_file():
Expand Down Expand Up @@ -112,23 +112,25 @@ async def _get_authorization(self, force_reauth: bool = False) -> Dict[str, str]

async def get_transforms(self) -> List[TransformStatus]:
headers = await self._get_authorization()
retry_options = ExponentialRetry(attempts=3, start_timeout=10)
async with RetryClient(retry_options=retry_options) as client:
async with client.get(
retry_options = Retry(total=3, backoff_factor=10)
async with ClientSession(
transport=RetryTransport(retry=retry_options)
) as client:
r = await client.get(
url=f"{self.url}/servicex/transformation", headers=headers
) as r:
if r.status == 401:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status > 400:
error_message = await _extract_message(r)
raise RuntimeError(
"ServiceX WebAPI Error during transformation "
f"submission: {r.status} - {error_message}"
)
o = await r.json()
statuses = [TransformStatus(**status) for status in o["requests"]]
)
if r.status_code == 401:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status_code > 400:
error_message = await _extract_message(r)
raise RuntimeError(
"ServiceX WebAPI Error during transformation "
f"submission: {r.status_code} - {error_message}"
Copy link
Preview

Copilot AI May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error raised in get_transforms mentions 'submission' but this method retrieves transforms; update the message to reflect retrieval (e.g., 'retrieval').

Suggested change
f"submission: {r.status_code} - {error_message}"
f"retrieval: {r.status_code} - {error_message}"

Copilot uses AI. Check for mistakes.

)
o = await r.json()
statuses = [TransformStatus(**status) for status in o["requests"]]
return statuses

def get_code_generators(self):
Expand All @@ -150,19 +152,18 @@ async def get_datasets(
params["show-deleted"] = True

async with ClientSession() as session:
async with session.get(
r = await session.get(
headers=headers, url=f"{self.url}/servicex/datasets", params=params
) as r:

if r.status == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed to get datasets: {r.status} - {msg}")
)
if r.status_code == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status_code != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed to get datasets: {r.status_code} - {msg}")

result = await r.json()
result = await r.json()

datasets = [CachedDataset(**d) for d in result["datasets"]]
return datasets
Expand All @@ -172,112 +173,108 @@ async def get_dataset(self, dataset_id=None) -> CachedDataset:
path_template = "/servicex/datasets/{dataset_id}"
url = self.url + path_template.format(dataset_id=dataset_id)
async with ClientSession() as session:
async with session.get(headers=headers, url=url) as r:

if r.status == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status == 404:
raise ValueError(f"Dataset {dataset_id} not found")
elif r.status != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed to get dataset {dataset_id} - {msg}")
result = await r.json()
r = await session.get(headers=headers, url=url)
if r.status_code == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status_code == 404:
raise ValueError(f"Dataset {dataset_id} not found")
elif r.status_code != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed to get dataset {dataset_id} - {msg}")
result = await r.json()

dataset = CachedDataset(**result)
return dataset
dataset = CachedDataset(**result)
return dataset

async def delete_dataset(self, dataset_id=None) -> bool:
headers = await self._get_authorization()
path_template = "/servicex/datasets/{dataset_id}"
url = self.url + path_template.format(dataset_id=dataset_id)

async with ClientSession() as session:
async with session.delete(headers=headers, url=url) as r:

if r.status == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status == 404:
raise ValueError(f"Dataset {dataset_id} not found")
elif r.status != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed to delete dataset {dataset_id} - {msg}")
result = await r.json()
return result["stale"]
r = await session.delete(headers=headers, url=url)
if r.status_code == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status_code == 404:
raise ValueError(f"Dataset {dataset_id} not found")
elif r.status_code != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed to delete dataset {dataset_id} - {msg}")
result = await r.json()
return result["stale"]

async def delete_transform(self, transform_id=None):
headers = await self._get_authorization()
path_template = f"/servicex/transformation/{transform_id}"
url = self.url + path_template.format(transform_id=transform_id)

async with ClientSession() as session:
async with session.delete(headers=headers, url=url) as r:

if r.status == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status == 404:
raise ValueError(f"Transform {transform_id} not found")
elif r.status != 200:
msg = await _extract_message(r)
raise RuntimeError(
f"Failed to delete transform {transform_id} - {msg}"
)
r = await session.delete(headers=headers, url=url)
if r.status_code == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status_code == 404:
raise ValueError(f"Transform {transform_id} not found")
elif r.status_code != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed to delete transform {transform_id} - {msg}")

async def cancel_transform(self, transform_id=None):
headers = await self._get_authorization()
path_template = f"/servicex/transformation/{transform_id}/cancel"
url = self.url + path_template.format(transform_id=transform_id)

async with ClientSession() as session:
async with session.get(headers=headers, url=url) as r:

if r.status == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status == 404:
raise ValueError(f"Transform {transform_id} not found")
elif r.status != 200:
msg = await _extract_message(r)
raise RuntimeError(
f"Failed to cancel transform {transform_id} - {msg}"
)
r = await session.get(headers=headers, url=url)
if r.status_code == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status_code == 404:
raise ValueError(f"Transform {transform_id} not found")
elif r.status_code != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed to cancel transform {transform_id} - {msg}")

async def submit_transform(self, transform_request: TransformRequest) -> str:
headers = await self._get_authorization()
retry_options = ExponentialRetry(attempts=3, start_timeout=30)
async with RetryClient(retry_options=retry_options) as client:
async with client.post(
retry_options = Retry(total=3, backoff_factor=30)
async with ClientSession(
transport=RetryTransport(retry=retry_options)
) as client:
r = await client.post(
url=f"{self.url}/servicex/transformation",
headers=headers,
json=transform_request.model_dump(by_alias=True, exclude_none=True),
) as r:
if r.status == 401:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status == 400:
message = await _extract_message(r)
raise ValueError(f"Invalid transform request: {message}")
elif r.status > 400:
error_message = await _extract_message(r)
raise RuntimeError(
"ServiceX WebAPI Error during transformation "
f"submission: {r.status} - {error_message}"
)
else:
o = await r.json()
return o["request_id"]
)
if r.status_code == 401:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status_code == 400:
message = await _extract_message(r)
raise ValueError(f"Invalid transform request: {message}")
elif r.status_code > 400:
error_message = await _extract_message(r)
raise RuntimeError(
"ServiceX WebAPI Error during transformation "
f"submission: {r.status_code} - {error_message}"
)
else:
o = await r.json()
return o["request_id"]

async def get_transform_status(self, request_id: str) -> TransformStatus:
headers = await self._get_authorization()
retry_options = ExponentialRetry(attempts=5, start_timeout=3)
async with RetryClient(retry_options=retry_options) as client:
retry_options = Retry(total=5, backoff_factor=3)
async with ClientSession(
transport=RetryTransport(retry=retry_options)
) as client:
try:
async for attempt in AsyncRetrying(
retry=retry_if_not_exception_type(ValueError),
Expand All @@ -286,28 +283,29 @@ async def get_transform_status(self, request_id: str) -> TransformStatus:
reraise=True,
):
with attempt:
async with client.get(
r = await client.get(
url=f"{self.url}/servicex/" f"transformation/{request_id}",
headers=headers,
) as r:
if r.status == 401:
# perhaps we just ran out of auth validity the last time?
# refetch auth then raise an error for retry
headers = await self._get_authorization(True)
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
if r.status == 404:
raise ValueError(f"Transform ID {request_id} not found")
elif r.status > 400:
error_message = await _extract_message(r)
raise RuntimeError(
"ServiceX WebAPI Error during transformation: "
f"{r.status} - {error_message}"
)
o = await r.json()
return TransformStatus(**o)
)
if r.status_code == 401:
# perhaps we just ran out of auth validity the last time?
# refetch auth then raise an error for retry
headers = await self._get_authorization(True)
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
if r.status_code == 404:
raise ValueError(f"Transform ID {request_id} not found")
elif r.status_code > 400:
error_message = await _extract_message(r)
raise RuntimeError(
"ServiceX WebAPI Error during transformation: "
f"{r.status_code} - {error_message}"
)
o = await r.json()
return TransformStatus(**o)
except RuntimeError as e:
raise RuntimeError(
"ServiceX WebAPI Error " f"while getting transform status: {e}"
)
raise RuntimeError("ServiceX WebAPI: unable to retrieve transform status")
Loading
Loading