-
Notifications
You must be signed in to change notification settings - Fork 13
Change from aiohttp to httpx #598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ponyisi
wants to merge
2
commits into
master
Choose a base branch
from
aiohttp-to-httpx
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -29,10 +29,10 @@ | |||||
import time | ||||||
from typing import Optional, Dict, List | ||||||
|
||||||
from aiohttp import ClientSession | ||||||
import httpx | ||||||
from aiohttp_retry import RetryClient, ExponentialRetry, ClientResponse | ||||||
from aiohttp import ContentTypeError | ||||||
from httpx import AsyncClient as ClientSession, Response | ||||||
from json import JSONDecodeError | ||||||
from httpx_retries import RetryTransport, Retry | ||||||
from google.auth import jwt | ||||||
from tenacity import ( | ||||||
AsyncRetrying, | ||||||
|
@@ -48,12 +48,12 @@ class AuthorizationError(BaseException): | |||||
pass | ||||||
|
||||||
|
||||||
async def _extract_message(r: ClientResponse): | ||||||
async def _extract_message(r: Response): | ||||||
try: | ||||||
o = await r.json() | ||||||
error_message = o.get("message", str(r)) | ||||||
except ContentTypeError: | ||||||
error_message = await r.text() | ||||||
except JSONDecodeError: | ||||||
error_message = r.text | ||||||
return error_message | ||||||
|
||||||
|
||||||
|
@@ -66,15 +66,15 @@ def __init__(self, url: str, refresh_token: Optional[str] = None): | |||||
async def _get_token(self): | ||||||
url = f"{self.url}/token/refresh" | ||||||
headers = {"Authorization": f"Bearer {self.refresh_token}"} | ||||||
async with RetryClient() as client: | ||||||
async with client.post(url, headers=headers, json=None) as r: | ||||||
if r.status == 200: | ||||||
o = await r.json() | ||||||
self.token = o["access_token"] | ||||||
else: | ||||||
raise AuthorizationError( | ||||||
f"ServiceX access token request rejected [{r.status} {r.reason}]" | ||||||
) | ||||||
async with ClientSession() as client: | ||||||
r = await client.post(url, headers=headers, json=None) | ||||||
if r.status_code == 200: | ||||||
o = await r.json() | ||||||
self.token = o["access_token"] | ||||||
else: | ||||||
raise AuthorizationError( | ||||||
f"ServiceX access token request rejected [{r.status_code} {r.reason_phrase}]" | ||||||
) | ||||||
|
||||||
@staticmethod | ||||||
def _get_bearer_token_file(): | ||||||
|
@@ -112,23 +112,25 @@ async def _get_authorization(self, force_reauth: bool = False) -> Dict[str, str] | |||||
|
||||||
async def get_transforms(self) -> List[TransformStatus]: | ||||||
headers = await self._get_authorization() | ||||||
retry_options = ExponentialRetry(attempts=3, start_timeout=10) | ||||||
async with RetryClient(retry_options=retry_options) as client: | ||||||
async with client.get( | ||||||
retry_options = Retry(total=3, backoff_factor=10) | ||||||
async with ClientSession( | ||||||
transport=RetryTransport(retry=retry_options) | ||||||
) as client: | ||||||
r = await client.get( | ||||||
url=f"{self.url}/servicex/transformation", headers=headers | ||||||
) as r: | ||||||
if r.status == 401: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status > 400: | ||||||
error_message = await _extract_message(r) | ||||||
raise RuntimeError( | ||||||
"ServiceX WebAPI Error during transformation " | ||||||
f"submission: {r.status} - {error_message}" | ||||||
) | ||||||
o = await r.json() | ||||||
statuses = [TransformStatus(**status) for status in o["requests"]] | ||||||
) | ||||||
if r.status_code == 401: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status_code > 400: | ||||||
error_message = await _extract_message(r) | ||||||
raise RuntimeError( | ||||||
"ServiceX WebAPI Error during transformation " | ||||||
f"submission: {r.status_code} - {error_message}" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error raised in
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||||||
) | ||||||
o = await r.json() | ||||||
statuses = [TransformStatus(**status) for status in o["requests"]] | ||||||
return statuses | ||||||
|
||||||
def get_code_generators(self): | ||||||
|
@@ -150,19 +152,18 @@ async def get_datasets( | |||||
params["show-deleted"] = True | ||||||
|
||||||
async with ClientSession() as session: | ||||||
async with session.get( | ||||||
r = await session.get( | ||||||
headers=headers, url=f"{self.url}/servicex/datasets", params=params | ||||||
) as r: | ||||||
|
||||||
if r.status == 403: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status != 200: | ||||||
msg = await _extract_message(r) | ||||||
raise RuntimeError(f"Failed to get datasets: {r.status} - {msg}") | ||||||
) | ||||||
if r.status_code == 403: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status_code != 200: | ||||||
msg = await _extract_message(r) | ||||||
raise RuntimeError(f"Failed to get datasets: {r.status_code} - {msg}") | ||||||
|
||||||
result = await r.json() | ||||||
result = await r.json() | ||||||
|
||||||
datasets = [CachedDataset(**d) for d in result["datasets"]] | ||||||
return datasets | ||||||
|
@@ -172,112 +173,108 @@ async def get_dataset(self, dataset_id=None) -> CachedDataset: | |||||
path_template = "/servicex/datasets/{dataset_id}" | ||||||
url = self.url + path_template.format(dataset_id=dataset_id) | ||||||
async with ClientSession() as session: | ||||||
async with session.get(headers=headers, url=url) as r: | ||||||
|
||||||
if r.status == 403: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status == 404: | ||||||
raise ValueError(f"Dataset {dataset_id} not found") | ||||||
elif r.status != 200: | ||||||
msg = await _extract_message(r) | ||||||
raise RuntimeError(f"Failed to get dataset {dataset_id} - {msg}") | ||||||
result = await r.json() | ||||||
r = await session.get(headers=headers, url=url) | ||||||
if r.status_code == 403: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status_code == 404: | ||||||
raise ValueError(f"Dataset {dataset_id} not found") | ||||||
elif r.status_code != 200: | ||||||
msg = await _extract_message(r) | ||||||
raise RuntimeError(f"Failed to get dataset {dataset_id} - {msg}") | ||||||
result = await r.json() | ||||||
|
||||||
dataset = CachedDataset(**result) | ||||||
return dataset | ||||||
dataset = CachedDataset(**result) | ||||||
return dataset | ||||||
|
||||||
async def delete_dataset(self, dataset_id=None) -> bool: | ||||||
headers = await self._get_authorization() | ||||||
path_template = "/servicex/datasets/{dataset_id}" | ||||||
url = self.url + path_template.format(dataset_id=dataset_id) | ||||||
|
||||||
async with ClientSession() as session: | ||||||
async with session.delete(headers=headers, url=url) as r: | ||||||
|
||||||
if r.status == 403: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status == 404: | ||||||
raise ValueError(f"Dataset {dataset_id} not found") | ||||||
elif r.status != 200: | ||||||
msg = await _extract_message(r) | ||||||
raise RuntimeError(f"Failed to delete dataset {dataset_id} - {msg}") | ||||||
result = await r.json() | ||||||
return result["stale"] | ||||||
r = await session.delete(headers=headers, url=url) | ||||||
if r.status_code == 403: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status_code == 404: | ||||||
raise ValueError(f"Dataset {dataset_id} not found") | ||||||
elif r.status_code != 200: | ||||||
msg = await _extract_message(r) | ||||||
raise RuntimeError(f"Failed to delete dataset {dataset_id} - {msg}") | ||||||
result = await r.json() | ||||||
return result["stale"] | ||||||
|
||||||
async def delete_transform(self, transform_id=None): | ||||||
headers = await self._get_authorization() | ||||||
path_template = f"/servicex/transformation/{transform_id}" | ||||||
url = self.url + path_template.format(transform_id=transform_id) | ||||||
|
||||||
async with ClientSession() as session: | ||||||
async with session.delete(headers=headers, url=url) as r: | ||||||
|
||||||
if r.status == 403: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status == 404: | ||||||
raise ValueError(f"Transform {transform_id} not found") | ||||||
elif r.status != 200: | ||||||
msg = await _extract_message(r) | ||||||
raise RuntimeError( | ||||||
f"Failed to delete transform {transform_id} - {msg}" | ||||||
) | ||||||
r = await session.delete(headers=headers, url=url) | ||||||
if r.status_code == 403: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status_code == 404: | ||||||
raise ValueError(f"Transform {transform_id} not found") | ||||||
elif r.status_code != 200: | ||||||
msg = await _extract_message(r) | ||||||
raise RuntimeError(f"Failed to delete transform {transform_id} - {msg}") | ||||||
|
||||||
async def cancel_transform(self, transform_id=None): | ||||||
headers = await self._get_authorization() | ||||||
path_template = f"/servicex/transformation/{transform_id}/cancel" | ||||||
url = self.url + path_template.format(transform_id=transform_id) | ||||||
|
||||||
async with ClientSession() as session: | ||||||
async with session.get(headers=headers, url=url) as r: | ||||||
|
||||||
if r.status == 403: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status == 404: | ||||||
raise ValueError(f"Transform {transform_id} not found") | ||||||
elif r.status != 200: | ||||||
msg = await _extract_message(r) | ||||||
raise RuntimeError( | ||||||
f"Failed to cancel transform {transform_id} - {msg}" | ||||||
) | ||||||
r = await session.get(headers=headers, url=url) | ||||||
if r.status_code == 403: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status_code == 404: | ||||||
raise ValueError(f"Transform {transform_id} not found") | ||||||
elif r.status_code != 200: | ||||||
msg = await _extract_message(r) | ||||||
raise RuntimeError(f"Failed to cancel transform {transform_id} - {msg}") | ||||||
|
||||||
async def submit_transform(self, transform_request: TransformRequest) -> str: | ||||||
headers = await self._get_authorization() | ||||||
retry_options = ExponentialRetry(attempts=3, start_timeout=30) | ||||||
async with RetryClient(retry_options=retry_options) as client: | ||||||
async with client.post( | ||||||
retry_options = Retry(total=3, backoff_factor=30) | ||||||
async with ClientSession( | ||||||
transport=RetryTransport(retry=retry_options) | ||||||
) as client: | ||||||
r = await client.post( | ||||||
url=f"{self.url}/servicex/transformation", | ||||||
headers=headers, | ||||||
json=transform_request.model_dump(by_alias=True, exclude_none=True), | ||||||
) as r: | ||||||
if r.status == 401: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status == 400: | ||||||
message = await _extract_message(r) | ||||||
raise ValueError(f"Invalid transform request: {message}") | ||||||
elif r.status > 400: | ||||||
error_message = await _extract_message(r) | ||||||
raise RuntimeError( | ||||||
"ServiceX WebAPI Error during transformation " | ||||||
f"submission: {r.status} - {error_message}" | ||||||
) | ||||||
else: | ||||||
o = await r.json() | ||||||
return o["request_id"] | ||||||
) | ||||||
if r.status_code == 401: | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
elif r.status_code == 400: | ||||||
message = await _extract_message(r) | ||||||
raise ValueError(f"Invalid transform request: {message}") | ||||||
elif r.status_code > 400: | ||||||
error_message = await _extract_message(r) | ||||||
raise RuntimeError( | ||||||
"ServiceX WebAPI Error during transformation " | ||||||
f"submission: {r.status_code} - {error_message}" | ||||||
) | ||||||
else: | ||||||
o = await r.json() | ||||||
return o["request_id"] | ||||||
|
||||||
async def get_transform_status(self, request_id: str) -> TransformStatus: | ||||||
headers = await self._get_authorization() | ||||||
retry_options = ExponentialRetry(attempts=5, start_timeout=3) | ||||||
async with RetryClient(retry_options=retry_options) as client: | ||||||
retry_options = Retry(total=5, backoff_factor=3) | ||||||
async with ClientSession( | ||||||
transport=RetryTransport(retry=retry_options) | ||||||
) as client: | ||||||
try: | ||||||
async for attempt in AsyncRetrying( | ||||||
retry=retry_if_not_exception_type(ValueError), | ||||||
|
@@ -286,28 +283,29 @@ async def get_transform_status(self, request_id: str) -> TransformStatus: | |||||
reraise=True, | ||||||
): | ||||||
with attempt: | ||||||
async with client.get( | ||||||
r = await client.get( | ||||||
url=f"{self.url}/servicex/" f"transformation/{request_id}", | ||||||
headers=headers, | ||||||
) as r: | ||||||
if r.status == 401: | ||||||
# perhaps we just ran out of auth validity the last time? | ||||||
# refetch auth then raise an error for retry | ||||||
headers = await self._get_authorization(True) | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
if r.status == 404: | ||||||
raise ValueError(f"Transform ID {request_id} not found") | ||||||
elif r.status > 400: | ||||||
error_message = await _extract_message(r) | ||||||
raise RuntimeError( | ||||||
"ServiceX WebAPI Error during transformation: " | ||||||
f"{r.status} - {error_message}" | ||||||
) | ||||||
o = await r.json() | ||||||
return TransformStatus(**o) | ||||||
) | ||||||
if r.status_code == 401: | ||||||
# perhaps we just ran out of auth validity the last time? | ||||||
# refetch auth then raise an error for retry | ||||||
headers = await self._get_authorization(True) | ||||||
raise AuthorizationError( | ||||||
f"Not authorized to access serviceX at {self.url}" | ||||||
) | ||||||
if r.status_code == 404: | ||||||
raise ValueError(f"Transform ID {request_id} not found") | ||||||
elif r.status_code > 400: | ||||||
error_message = await _extract_message(r) | ||||||
raise RuntimeError( | ||||||
"ServiceX WebAPI Error during transformation: " | ||||||
f"{r.status_code} - {error_message}" | ||||||
) | ||||||
o = await r.json() | ||||||
return TransformStatus(**o) | ||||||
except RuntimeError as e: | ||||||
raise RuntimeError( | ||||||
"ServiceX WebAPI Error " f"while getting transform status: {e}" | ||||||
) | ||||||
raise RuntimeError("ServiceX WebAPI: unable to retrieve transform status") |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Aliasing
httpx.AsyncClient
asClientSession
can be confusing given its similarity to aiohttp; consider using a distinct alias likeAsyncClient
for clarity.Copilot uses AI. Check for mistakes.