-
-
Notifications
You must be signed in to change notification settings - Fork 195
Add support for httpx as backend #1085
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
aiobotocore/httpsession.py
Outdated
|
||
# previously data was wrapped in _IOBaseWrapper | ||
# github.com/aio-libs/aiohttp/issues/1907 | ||
# I haven't researched whether that's relevant with httpx. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya silly decision of aiohttp, they took over the stream. Most likely httpx does the right thing. I think to get around the sync/async thing we can just make a stream wrapper that hides the relevant methods...I think I did this somewhere...will try to remember
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would the current tests catch if httpx didn't do the right thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, we just need a test that for example passes a file handle to upload to s3, and then make sure the file handle is still open after the put_object. I think we don't have one, mind adding a really small test of this?
I started wondering whether The way that |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1085 +/- ##
==========================================
- Coverage 91.03% 90.25% -0.78%
==========================================
Files 64 65 +1
Lines 6745 7021 +276
==========================================
+ Hits 6140 6337 +197
- Misses 605 684 +79
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Whooooo, all tests are passing!!!! current TODOs:
codecov is very sad, but most of that is due to me duplicating code that wasn't covered to start with, or extending tests that aren't run in CI. I'll try to make it very slightly less sad, but making it completely unsad is very much out of scope for this PR. Likewise RTD is failing ... and I think that's unrelated to the PR? |
Add no-httpx run to CI on 3.12 Tests can now run without httpx installed. Exclude `if TYPE_CHECKING` blocks from coverage. various code cleanup
…ive errors on connector args not compatible with httpx. Remove proxy code and raise NotImplementedError. fix/add tests
@thejcannon @aneeshusa if you wanna do a review pass |
Hey @thehesiod what's the feeling on this? It is turning out to be a messier and more disruptive change than initially thought in #749. I can pull out some of the changes to a separate PR to make this a bit smaller at least |
hey sorry been down with a cold, will look asap. I don't mind big PRs |
# AWS has a 20 second idle timeout: | ||
# https://web.archive.org/web/20150926192339/https://forums.aws.amazon.com/message.jspa?messageID=215367 | ||
# "httpx default timeout is 5s so set something reasonable here" | ||
self._connector_args: dict[str, Any] = {'keepalive_timeout': 12} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may want to make this an official param as well
limits = httpx.Limits( | ||
max_connections=self._max_pool_connections, | ||
# 5 is httpx default, specifying None is no limit | ||
keepalive_expiry=self._connector_args.get('keepalive_timeout', 5), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets do this in one place with one default, there are now two defaults a 5 and a 12. Just set it once in __init__
# TODO [httpx]: I put logic here to minimize diff / accidental downstream | ||
# consequences - but can probably put this logic in __init__ | ||
if self._cert_file and self._key_file is None: | ||
cert = self._cert_file | ||
elif self._cert_file: | ||
cert = (self._cert_file, self._key_file) | ||
else: | ||
cert = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both should just do it once in one place, we should match what botocore does as that's the ultimate diff we need to maintain: https://github.com/boto/botocore/blob/dc33566f6cae5a2a0462aadb4e23355fabd26fa5/botocore/httpsession.py#L310-L315. Looks like it should just be done in __init__
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what this logic is doing is essentially undoing the logic in __init__
(from botocore), we want to reassemble the client_cert
variable to pass it into httpx.
So the alternative to this logic is saving an additional class variable self._client_cert = client_cert
in the __init__
cert = None | ||
|
||
# TODO [httpx]: skip_auto_headers={'Content-TYPE'} ? | ||
# TODO [httpx]: auto_decompress=False ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most likely don't need either of these, the auto_decompress
was a bad aiohttp default decision. Most likely content-type as well..they were trying to be too smart ;)
"Proxy support not implemented with httpx as backend." | ||
) | ||
|
||
# TODO: handle socket_options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we error if anything set then until it's handled?
if amt is not None: | ||
# We could do a fancy thing here and start doing calls to | ||
# aiter_bytes()/aiter_raw() and keep state | ||
raise ValueError( | ||
"httpx.Response.aread does not support reading a specific number of bytes" | ||
) | ||
return await self.__wrapped__.aread() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you swap to aiter_[bytes/text/raw]
they take a chunk_size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh, I see how it only lets you specify the chunk size when the iterator is created, let me see, perhaps we can make a custom one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had Junie created some code to support dynamic chunk sizes:
class Response:
async def aiter_bytes_dynamic(
self, chunk_size_func: typing.Callable[[], int | None]
) -> typing.AsyncIterator[bytes]:
"""
A byte-iterator over the decoded response content with dynamic chunk sizes.
The chunk size is determined by a callable that is invoked for each chunk.
This allows us to handle gzip, deflate, brotli, and zstd encoded responses
with varying chunk sizes during iteration.
Args:
chunk_size_func: A callable that returns the chunk size to use for the next chunk.
If it returns None, the entire remaining content will be returned as one chunk.
"""
if hasattr(self, "_content"):
# If content is already loaded, use the dynamic chunking on the full content
buffer = io.BytesIO(self._content)
while True:
chunk_size = chunk_size_func()
if chunk_size is None:
# Return all remaining content
remaining = buffer.read()
if remaining:
yield remaining
break
chunk = buffer.read(chunk_size)
if not chunk:
break
yield chunk
else:
decoder = self._get_content_decoder()
chunker = DynamicByteChunker(chunk_size_func=chunk_size_func)
with request_context(request=self._request):
async for raw_bytes in self.aiter_raw():
decoded = decoder.decode(raw_bytes)
for chunk in chunker.decode(decoded):
yield chunk
decoded = decoder.flush()
for chunk in chunker.decode(decoded):
yield chunk # pragma: no cover
for chunk in chunker.flush():
yield chunk
and
class DynamicByteChunker:
"""
Handles returning byte content in dynamically-sized chunks.
The chunk size is determined by a callable that is invoked for each chunk.
"""
def __init__(self, chunk_size_func: typing.Callable[[], int | None]) -> None:
self._buffer = io.BytesIO()
self._chunk_size_func = chunk_size_func
def decode(self, content: bytes) -> list[bytes]:
chunk_size = self._chunk_size_func()
if chunk_size is None:
return [content] if content else []
self._buffer.write(content)
chunks = []
while self._buffer.tell() >= chunk_size:
# Get current buffer value
self._buffer.seek(0)
value = self._buffer.getvalue()
# Extract one chunk
chunk = value[:chunk_size]
chunks.append(chunk)
# Update buffer with remaining data
self._buffer.seek(0)
self._buffer.write(value[chunk_size:])
self._buffer.truncate()
# Get new chunk size for next iteration
chunk_size = self._chunk_size_func()
if chunk_size is None:
# If chunk_size becomes None, return remaining buffer as one chunk
remaining = self._buffer.getvalue()
if remaining:
chunks.append(remaining)
self._buffer.seek(0)
self._buffer.truncate()
break
return chunks
def flush(self) -> list[bytes]:
value = self._buffer.getvalue()
self._buffer.seek(0)
self._buffer.truncate()
return [value] if value else []
even made a test for itself lol
import asyncio
from httpx._models import Response
from httpx._content import ByteStream
class TestDynamicChunking:
def test_aiter_bytes_dynamic_with_loaded_content(self):
"""Test aiter_bytes_dynamic with already loaded content."""
content = b"0123456789" * 10 # 100 bytes
response = Response(200, content=content)
# Test with increasing chunk sizes
chunk_sizes = [1, 2, 3, 4, 5]
chunk_size_iter = iter(chunk_sizes)
chunks = []
async def get_chunks():
nonlocal chunks
async for chunk in response.aiter_bytes_dynamic(lambda: next(chunk_size_iter, None)):
chunks.append(chunk)
asyncio.run(get_chunks())
# We should get chunks of sizes 1, 2, 3, 4, 5, and then the rest (85 bytes)
assert len(chunks) == len(chunk_sizes) + 1
assert chunks[0] == content[:1]
assert chunks[1] == content[1:3]
assert chunks[2] == content[3:6]
assert chunks[3] == content[6:10]
assert chunks[4] == content[10:15]
assert chunks[5] == content[15:]
def test_aiter_bytes_dynamic_with_stream(self):
"""Test aiter_bytes_dynamic with streaming content."""
content = b"0123456789" * 10 # 100 bytes
stream = ByteStream(content)
response = Response(200, stream=stream)
# Test with alternating chunk sizes
chunk_sizes = [5, 10, 15, 20]
chunk_size_iter = iter(chunk_sizes)
chunks = []
async def get_chunks():
nonlocal chunks
async for chunk in response.aiter_bytes_dynamic(lambda: next(chunk_size_iter, None)):
chunks.append(chunk)
asyncio.run(get_chunks())
# Verify the chunks
expected_chunks = []
pos = 0
for size in chunk_sizes:
expected_chunks.append(content[pos:pos+size])
pos += size
if pos < len(content):
expected_chunks.append(content[pos:])
assert chunks == expected_chunks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also looks like you can make that an external helper via using Response.content
, Response.request
and unfortunately the private function _get_content_decoder
. Otherwise may need to create a custom transport to have a subclass of the Response class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this important enough that we want to maintain a helper of this complexity? As opposed to instructing people to use the streaming tools built into httpx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to ability to swap between aiohttp/httpx with minimal changes to avoid friction. Like at work we'll probably do this for http/2 support. Unfortunately/fortunately we started with aiohttp so we're with that set of defaults. I just went through this exercise at my work and ended up making part of this work, here's what I came up with. We'll need tighter version lock to httpx though due to use of private attribute, probably patch version, I mean technically for aiohttp we should really be doing that too. Actually we can potentially only make the read w/ content length go through this mechanism that needs the private ref so the normal way would be safer. Perhaps we open request for httpx to natively support this so later we can drop it. Thoughts?
class AIOHTTPResponseAdapter(wrapt.ObjectProxy):
def __init__(self, obj: aiohttp.ClientResponse | httpx.Response):
super().__init__(obj)
self.__is_httpx = hasattr(obj, 'status_code')
async def text(self):
if self.__is_httpx:
await self.__wrapped__.aread()
return self.__wrapped__.text
return await self.__wrapped__.text()
@property
def reason(self):
if self.__is_httpx:
return self.__wrapped__.reason_phrase
return self.__wrapped__.reason
@property
def status(self):
if self.__is_httpx:
return self.__wrapped__.status_code
return self.__wrapped__.status
async def json(self, **kwargs):
if self.__is_httpx:
await self.__wrapped__.aread()
return self.__wrapped__.json(**kwargs)
return await self.__wrapped__.json(**kwargs)
async def read(self):
if self.__is_httpx:
return await self.__wrapped__.aread() # technically we implement this
return self.__wrapped__.read()
@property
def request_info(self):
if self.__is_httpx:
request: httpx.Request = self.__wrapped__.request
return aiohttp.RequestInfo(request.url, request.method, request.headers, request.url)
return self.__wrapped__.request_info
def wrapped_obj(self):
return self.__wrapped__
class SingleEnterCtx(wrapt.ObjectProxy):
def __init__(self, obj):
super().__init__(obj)
self.__response = None
async def __aenter__(self):
if not self.__response:
self.__response = AIOHTTPResponseAdapter(await self.__wrapped__.__aenter__())
return self.__response
async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb)
and then used by wrapping:
return SingleEnterCtx(self._http_client.stream(
method,
self._base_url.join(api),
data=data,
auth=auth,
headers=headers,
**kwargs
))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to ability to swap between aiohttp/httpx with minimal changes to avoid friction. Like at work we'll probably do this for http/2 support. Unfortunately/fortunately we started with aiohttp so we're with that set of defaults
Of course we want the transition from aiohttp to httpx be as smooth as possible, but ideally the httpx backend would have feature parity such that nobody would need to go back to the aiohttp backend once they have transitioned the code. Here the transition would mean using the underlying httpx stream tools and needing to specify the chunk size up front. But if dynamic chunk sizing is required for some usage and widely used then I suppose we don't have an option.
async def aclose(self): | ||
return self.__wrapped__.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, I'm torn about this. If we do expose this it should also await self.__wrapped__.wait_for_close()
. Ideally people should always use the context manager for this close, ex:
async with response['Body'] as stream:
I suppose adding aclose won't hurt, however better fix I think is raising an error on close
method for httpx and telling people to use the context manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you have a aclose
you could get state issues, like if w/in the context manager you create a task for aclose and then exit the context. httpx would have to do locking to make sure "bad things" don't happen :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This addition is adding aclose
to the aiohttp StreamingBody
, httpx.Response
already defines aclose
(but not close
), in order to be able to not have to have a conditional in the tests (and make transitioning
Given that httpx
already supports aclose
I'm a bit vary of banning it, but I def agree that context manager is the way to do it.
The other way of getting the tests to not look awful is to change the 8 tests in test_basic_s3
to all use the context manager form instead. (and add a dedicated one to test close
+aclose
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea it's a tough call, aiobotocore is trying to make the interface match botocore as much as possible, which has close and not aclose, that's why also for aiohttp we didn't create an aclose. On aiohttp's part the fact they made the close method sync was very questionable. We should have exposed an async close that called their sync close method, that's our bad, not sure who's fault that was :) I still think the real issue here is the httpx interface. They should have had a base response class, and the sync version had a sync close, and the async version had an async close. I don't think we should propagate their design decisions into our interfaces. It sounds like they want to support sync and async versions of their interfaces at the same time which we do not want to do. That would mean all our other interfaces should support sync versions as well, no thank you ;)
Actually I have a better idea, I think. How about we major rev aiobotocore and mark this issue as a breaking change such that we're going to expose the close method as an async method and not a sync method anymore? Given that python now warns you about non-awaited coroutines they'll get a warning if they do it incorrectly after a major rev.
thoughts? what do you think @jakob-keller, @terricain since this may affect aioboto3 if it exposes close
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[...] expose the close method as an async method and not a sync method anymore?
I couldn't find StreamingBody.close()
anywhere. Does that even exist at the moment? The way I understand, it is meant to be used purely as an asynchronous context manager. No need for close
nor aclose
. Any clean up should be performed in __aexit__()
directly or indirectly by delegating to the wrapped object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jakob-keller it's because we use a wrapt.ObjectProxy
around the ClientResponse
so we expose all the properties/method it exposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jakob-keller I do like that idea of removing the method altogether. The weird thing is that you get it w/o having to aenter it, so having a close on it seems ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plus to have parallelism with botocore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The convention in trio is that close
is sync and aclose
is async, and there's a decent amount of classes that support both when close
is a trivial operation that only sets an internal flag or something.
It is quite confusing that StreamingBody
/HttpxStreamingBody
does not indicate what the actual API is, might be worth adding some comments there. But the fact that we're doing a thin wrapper around a library-specific object makes it quite tricky to be library-agnostic and also try to adhere to botocore API. It's possible we might want to step away from that and define the API more strictly, although also leaving the possibility of accessing the underlying raw stream fairly easily when people want to step away from that.
if httpx and isinstance(body, httpx.Response): | ||
# httpx does not support `.aread(1)` | ||
byte_iterator = body.aiter_raw(1) | ||
chunk1 = await byte_iterator.__anext__() | ||
chunk2 = b"" | ||
async for b in byte_iterator: | ||
chunk2 += b | ||
else: | ||
chunk1 = await body.read(1) | ||
chunk2 = await body.read() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it does, but you can't use aread
, you need to use the other methods I mentioned in another file
ok finished first review iteration, sorry for delay! really appreciate all this work, we're getting there! |
a bunch of tests in doesn't look like I managed to harmonize the CI changes either, ugh |
aiobotocore/httpxsession.py
Outdated
response.headers.items() | ||
) | ||
|
||
http_response = aiobotocore.awsrequest.HttpxAWSResponse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw does this work with SigV4 signing? I'm having a few issues getting this to work on our system due differences in signature calculation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm on the path, we should probably have another set of tests that does http/2...however not sure if moto supports this. Also should have a config param to enable this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok figured out my issue, it's critical when using http/2 w/ httpx to not have it pre-populate the Connection
header when creating the Client. My fix:
self._http_session = await self._exit_stack.enter_async_context(httpx.AsyncClient(http2=True))
# with http/2 you don't want to have Connection in the headers or it breaks signing when later h2 strips out this header
del self._http_session.headers['Connection']
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not familiar with this, so would be great if you could write a test for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this is actually action for you, it would be another global httpx test case scenario where we enable http2 with httpx via a new config variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually I forgot moto doesn't support http/2 yet since it uses werkzeug, I just created an issue to see if they'll add support: werkzeug however a work-around is perhaps adding a http2 in-proc proxy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I asked chatgpt here's what it recommended after a little prodding, it's not great but a good starting off point.
import ssl
import tempfile
import subprocess
import time
import pytest
from moto.server import ThreadedMotoServer
@pytest.fixture(scope="session")
def shared_self_signed_cert():
cert_dir = tempfile.TemporaryDirectory()
cert_path = os.path.join(cert_dir.name, "cert.pem")
key_path = os.path.join(cert_dir.name, "key.pem")
subprocess.run([
"openssl", "req", "-x509", "-newkey", "rsa:2048",
"-keyout", key_path, "-out", cert_path,
"-days", "1", "-nodes", "-subj", "/CN=localhost"
], check=True)
yield cert_path, key_path
cert_dir.cleanup()
@pytest.fixture
def isolated_moto_nghttpx(shared_self_signed_cert):
cert_path, key_path = shared_self_signed_cert
# Start Moto on dynamic port
moto_server = ThreadedMotoServer()
moto_server.start()
moto_port = moto_server.port
# Choose proxy port (could also use port=0 if launching via socket discovery)
proxy_port = moto_port + 10000
nghttpx_proc = subprocess.Popen([
"nghttpx",
f"-f127.0.0.1,{proxy_port};proto=h2",
f"-b127.0.0.1,{moto_port}",
"--tls-cert=" + cert_path,
"--tls-key=" + key_path,
"--no-ocsp"
])
time.sleep(0.5)
yield f"https://localhost:{proxy_port}"
nghttpx_proc.terminate()
moto_server.stop()
according to chatgpt the ubuntu runners already have it available, so just need to add:
- name: Install nghttpx
run: |
sudo apt-get update
sudo apt-get install -y nghttp2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't get the above to work, it fails to connect. I also tried with nginx
but with similar problems. But I'm also not sure where to apply your 'Connection'
fix.
But we could maybe leave this for a followup PR?
First step of #749 as described in #749 (comment)
I was tasked with implementing this, but it's been a bit of a struggle not being very familiar with aiohttp, httpx or aiobotocore - and there being ~zero in-line types. But I think I've fixed enough of the major problems that it's probably useful to share my progress.
There's a bunch of random types added. I can split those off into a separate PR or remove if requested. Likewise for
from __future__ import annotations
.TODO:
aiobotocore/aiobotocore/httpsession.py
Lines 478 to 534 in b19bc09
but AFAICT you can only configure proxies per-client in httpx. So need to move the logic for it, and cannot usebotocore.httpsession.ProxyConfiguration.proxy_[url,headers]_for(request.url)
BOTO_EXPERIMENTAL__ADD_PROXY_HOST_HEADERseems not possible to do when configuring proxies per-client?No longer TODOs after changing the scope to implement httpx alongside aiohttp:
test_patches
previously cared about aiohttp. That can probably be retired?tests.mock_server.AIOServer
?NotImplementedError
:use_dns_cache
: did not find any mentions of dns caches on a quick skim of httpx docsforce_close
: same. Can maybe find out more by digging into docs on what this option does in aiohttp.resolver
: this is anaiohttp.abc.AbstractResolver
which is obviously a no-go.yarl.URL(url, encoding=True)
. httpx does not support yarl. I don't know what this achieved (maybe the non-normalization??), so skipping it for now.Some extra tests would probably also be good, but not super critical when we're just implementing httpx alongside aiohttp.