Skip to content

Add support for httpx as backend #1085

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open

Add support for httpx as backend #1085

wants to merge 38 commits into from

Conversation

jakkdl
Copy link
Contributor

@jakkdl jakkdl commented Feb 2, 2024

First step of #749 as described in #749 (comment)

I was tasked with implementing this, but it's been a bit of a struggle not being very familiar with aiohttp, httpx or aiobotocore - and there being ~zero in-line types. But I think I've fixed enough of the major problems that it's probably useful to share my progress.

There's a bunch of random types added. I can split those off into a separate PR or remove if requested. Likewise for from __future__ import annotations.

TODO:

  • exceptions
    • retryable_exceptions: mostly just need to go through all httpx exceptions and decide which ones are fine
    • The mapping between httpx exceptions and aiobotocore exceptions can likely be improved.
      # **previous exception mapping**
      # aiohttp.ClientSSLError -> SSLError
      # aiohttp.ClientProxyConnectiorError
      # aiohttp.ClientHttpProxyError -> ProxyConnectionError
      # aiohttp.ServerDisconnectedError
      # aiohttp.ClientPayloadError
      # aiohttp.http_exceptions.BadStatusLine -> ConnectionClosedError
      # aiohttp.ServerTimeoutError -> ConnectTimeoutError|ReadTimeoutError
      # aiohttp.ClientConnectorError
      # aiohttp.ClientConnectionError
      # socket.gaierror -> EndpointConnectionError
      # asyncio.TimeoutError -> ReadTimeoutError
      # **possible httpx exception mapping**
      # httpx.CookieConflict
      # httpx.HTTPError
      # * httpx.HTTPStatusError
      # * httpx.RequestError
      # * httpx.DecodingError
      # * httpx.TooManyRedirects
      # * httpx.TransportError
      # * httpx.NetworkError
      # * httpx.CloseError -> ConnectionClosedError
      # * httpx.ConnectError -> EndpointConnectionError
      # * httpx.ReadError
      # * httpx.WriteError
      # * httpx.ProtocolError
      # * httpx.LocalProtocolError -> SSLError??
      # * httpx.RemoteProtocolError
      # * httpx.ProxyError -> ProxyConnectionError
      # * httpx.TimeoutException
      # * httpx.ConnectTimeout -> ConnectTimeoutError
      # * httpx.PoolTimeout
      # * httpx.ReadTimeout -> ReadTimeoutError
      # * httpx.WriteTimeout
      # * httpx.UnsupportedProtocol
      # * httpx.InvalidURL
      except httpx.ConnectError as e:
      raise EndpointConnectionError(endpoint_url=request.url, error=e)
      except (socket.gaierror,) as e:
      raise EndpointConnectionError(endpoint_url=request.url, error=e)
      except asyncio.TimeoutError as e:
      raise ReadTimeoutError(endpoint_url=request.url, error=e)
      except httpx.ReadTimeout as e:
      raise ReadTimeoutError(endpoint_url=request.url, error=e)
      except NotImplementedError:
      raise
      except Exception as e:
      message = 'Exception received when sending urllib3 HTTP request'
      logger.debug(message, exc_info=True)
      raise HTTPClientError(error=e)
  • proxy support
    • postponed to later PR
    • this was previously handled per-request, but AFAICT you can only configure proxies per-client in httpx. So need to move the logic for it, and cannot use botocore.httpsession.ProxyConfiguration.proxy_[url,headers]_for(request.url)
    • raising of ProxyConnectionError is very ugly atm, and probably not "correct"?
    • BOTO_EXPERIMENTAL__ADD_PROXY_HOST_HEADER
      • seems not possible to do when configuring proxies per-client?
  • wrap io.IOBase data in a non-sync-iterable async iterable
    • converted to bytes for now.
  • I have added change info to CHANGES.rst

No longer TODOs after changing the scope to implement httpx alongside aiohttp:

  • test_patches previously cared about aiohttp. That can probably be retired?
  • replace aiohttp with httpx in tests.mock_server.AIOServer?
  • The following connector_args now raise NotImplementedError:
    • use_dns_cache: did not find any mentions of dns caches on a quick skim of httpx docs
    • force_close: same. Can maybe find out more by digging into docs on what this option does in aiohttp.
    • resolver: this is an aiohttp.abc.AbstractResolver which is obviously a no-go.
      • raise error for code passing this
      • figure out equivalent functionality for httpx
  • url's were previously wrapped with yarl.URL(url, encoding=True). httpx does not support yarl. I don't know what this achieved (maybe the non-normalization??), so skipping it for now.

Some extra tests would probably also be good, but not super critical when we're just implementing httpx alongside aiohttp.


# previously data was wrapped in _IOBaseWrapper
# github.com/aio-libs/aiohttp/issues/1907
# I haven't researched whether that's relevant with httpx.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya silly decision of aiohttp, they took over the stream. Most likely httpx does the right thing. I think to get around the sync/async thing we can just make a stream wrapper that hides the relevant methods...I think I did this somewhere...will try to remember

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the current tests catch if httpx didn't do the right thing?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, we just need a test that for example passes a file handle to upload to s3, and then make sure the file handle is still open after the put_object. I think we don't have one, mind adding a really small test of this?

@jakkdl
Copy link
Contributor Author

jakkdl commented Feb 6, 2024

I started wondering whether response.StreamingBody should wrap httpx.Response or one of its iterators (aiter_bytes, aiter_text, aiter_lines or aiter_raw), but am now starting to think that maybe it doesn't make sense to have at all and we should just surface the httpx.Response object to the user and let them handle it as they want.

The way that aiohttp.StreamReader behaves is just different enough that providing a translation layer that handles httpx.Response streams the same way becomes quite clunky/inefficient/tricky/very different. StreamingBody.iter_chunks should be done by specifying chunk size when calling httpx.Response.aiter_bytes, and StreamingBody.iter_lines should use httpx.Response.aiter_lines, but the current API does nothing to stop you from reading one chunk, then one byte, but httpx.Response (very reasonably) only lets you initialize the iterators once.
Implementing iter_chunks/iter_lines/etc as reading one byte at a time with await anext() on an aiter_raw sounds awful, since there's no read() method that can return a set number of bytes. That in general makes StreamingBody.read() quite clunky to implement.

Copy link

codecov bot commented Feb 19, 2024

Codecov Report

Attention: Patch coverage is 71.29338% with 91 lines in your changes missing coverage. Please review.

Project coverage is 90.25%. Comparing base (cafe180) to head (e257182).
Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
aiobotocore/httpxsession.py 68.46% 41 Missing ⚠️
tests/test_basic_s3.py 50.00% 19 Missing ⚠️
tests/conftest.py 68.42% 18 Missing ⚠️
aiobotocore/httpchecksum.py 83.33% 4 Missing ⚠️
aiobotocore/_endpoint_helpers.py 71.42% 2 Missing ⚠️
aiobotocore/awsrequest.py 75.00% 2 Missing ⚠️
aiobotocore/endpoint.py 77.77% 2 Missing ⚠️
tests/test_lambda.py 75.00% 2 Missing ⚠️
aiobotocore/response.py 90.90% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1085      +/-   ##
==========================================
- Coverage   91.03%   90.25%   -0.78%     
==========================================
  Files          64       65       +1     
  Lines        6745     7021     +276     
==========================================
+ Hits         6140     6337     +197     
- Misses        605      684      +79     
Flag Coverage Δ
unittests 90.25% <71.29%> (-0.78%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@jakkdl jakkdl changed the title Replace aiohttp with httpx Add support for httpx as alternate backend Feb 19, 2024
@jakkdl jakkdl changed the title Add support for httpx as alternate backend Add support for httpx as backend Feb 19, 2024
@jakkdl
Copy link
Contributor Author

jakkdl commented Feb 19, 2024

Whooooo, all tests are passing!!!!
though I did an ugly with test_non_normalized_key_paths - I understand nothing about that test so I currently made the test pass if httpx returns a normalized path.

current TODOs:

  • I should add a command-line parameter that sets the http backend to be tested, so I can set up a CI environment without httpx installed to make sure that works.
  • Retryable exceptions.
    • Maybe try to write a test for it
  • figure out the branches in convert_to_response_dict.
    • I think they're fine?
  • ~~figure out proxies, or ~~raise NotImplementedError.
    • There is at least one test that sorta checks it so if raising I need to work around it.
  • Maybe add test for http_session_cls
  • Add documentation - RTD is broken?

codecov is very sad, but most of that is due to me duplicating code that wasn't covered to start with, or extending tests that aren't run in CI. I'll try to make it very slightly less sad, but making it completely unsad is very much out of scope for this PR.

Likewise RTD is failing ... and I think that's unrelated to the PR?

Add no-httpx run to CI on 3.12
Tests can now run without httpx installed.
Exclude `if TYPE_CHECKING` blocks from coverage.
various code cleanup
…ive errors on connector args not compatible with httpx. Remove proxy code and raise NotImplementedError. fix/add tests
@jakkdl jakkdl marked this pull request as ready for review February 20, 2024 12:28
@jakkdl
Copy link
Contributor Author

jakkdl commented Feb 21, 2024

@thejcannon @aneeshusa if you wanna do a review pass

@jakkdl jakkdl requested a review from thehesiod March 1, 2024 10:26
@jakkdl
Copy link
Contributor Author

jakkdl commented Mar 20, 2024

Hey @thehesiod what's the feeling on this? It is turning out to be a messier and more disruptive change than initially thought in #749. I can pull out some of the changes to a separate PR to make this a bit smaller at least

@thehesiod
Copy link
Collaborator

hey sorry been down with a cold, will look asap. I don't mind big PRs

# AWS has a 20 second idle timeout:
# https://web.archive.org/web/20150926192339/https://forums.aws.amazon.com/message.jspa?messageID=215367
# "httpx default timeout is 5s so set something reasonable here"
self._connector_args: dict[str, Any] = {'keepalive_timeout': 12}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may want to make this an official param as well

limits = httpx.Limits(
max_connections=self._max_pool_connections,
# 5 is httpx default, specifying None is no limit
keepalive_expiry=self._connector_args.get('keepalive_timeout', 5),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets do this in one place with one default, there are now two defaults a 5 and a 12. Just set it once in __init__

Comment on lines +139 to +146
# TODO [httpx]: I put logic here to minimize diff / accidental downstream
# consequences - but can probably put this logic in __init__
if self._cert_file and self._key_file is None:
cert = self._cert_file
elif self._cert_file:
cert = (self._cert_file, self._key_file)
else:
cert = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both should just do it once in one place, we should match what botocore does as that's the ultimate diff we need to maintain: https://github.com/boto/botocore/blob/dc33566f6cae5a2a0462aadb4e23355fabd26fa5/botocore/httpsession.py#L310-L315. Looks like it should just be done in __init__

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what this logic is doing is essentially undoing the logic in __init__ (from botocore), we want to reassemble the client_cert variable to pass it into httpx.

So the alternative to this logic is saving an additional class variable self._client_cert = client_cert in the __init__

cert = None

# TODO [httpx]: skip_auto_headers={'Content-TYPE'} ?
# TODO [httpx]: auto_decompress=False ?
Copy link
Collaborator

@thehesiod thehesiod May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most likely don't need either of these, the auto_decompress was a bad aiohttp default decision. Most likely content-type as well..they were trying to be too smart ;)

"Proxy support not implemented with httpx as backend."
)

# TODO: handle socket_options
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we error if anything set then until it's handled?

Comment on lines +142 to +148
if amt is not None:
# We could do a fancy thing here and start doing calls to
# aiter_bytes()/aiter_raw() and keep state
raise ValueError(
"httpx.Response.aread does not support reading a specific number of bytes"
)
return await self.__wrapped__.aread()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you swap to aiter_[bytes/text/raw] they take a chunk_size

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh, I see how it only lets you specify the chunk size when the iterator is created, let me see, perhaps we can make a custom one

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had Junie created some code to support dynamic chunk sizes:

class Response:
    async def aiter_bytes_dynamic(
        self, chunk_size_func: typing.Callable[[], int | None]
    ) -> typing.AsyncIterator[bytes]:
        """
        A byte-iterator over the decoded response content with dynamic chunk sizes.
        The chunk size is determined by a callable that is invoked for each chunk.

        This allows us to handle gzip, deflate, brotli, and zstd encoded responses
        with varying chunk sizes during iteration.

        Args:
            chunk_size_func: A callable that returns the chunk size to use for the next chunk.
                             If it returns None, the entire remaining content will be returned as one chunk.
        """
        if hasattr(self, "_content"):
            # If content is already loaded, use the dynamic chunking on the full content
            buffer = io.BytesIO(self._content)
            while True:
                chunk_size = chunk_size_func()
                if chunk_size is None:
                    # Return all remaining content
                    remaining = buffer.read()
                    if remaining:
                        yield remaining
                    break

                chunk = buffer.read(chunk_size)
                if not chunk:
                    break
                yield chunk
        else:
            decoder = self._get_content_decoder()
            chunker = DynamicByteChunker(chunk_size_func=chunk_size_func)
            with request_context(request=self._request):
                async for raw_bytes in self.aiter_raw():
                    decoded = decoder.decode(raw_bytes)
                    for chunk in chunker.decode(decoded):
                        yield chunk
                decoded = decoder.flush()
                for chunk in chunker.decode(decoded):
                    yield chunk  # pragma: no cover
                for chunk in chunker.flush():
                    yield chunk

and

class DynamicByteChunker:
    """
    Handles returning byte content in dynamically-sized chunks.
    The chunk size is determined by a callable that is invoked for each chunk.
    """

    def __init__(self, chunk_size_func: typing.Callable[[], int | None]) -> None:
        self._buffer = io.BytesIO()
        self._chunk_size_func = chunk_size_func

    def decode(self, content: bytes) -> list[bytes]:
        chunk_size = self._chunk_size_func()
        if chunk_size is None:
            return [content] if content else []

        self._buffer.write(content)
        chunks = []

        while self._buffer.tell() >= chunk_size:
            # Get current buffer value
            self._buffer.seek(0)
            value = self._buffer.getvalue()

            # Extract one chunk
            chunk = value[:chunk_size]
            chunks.append(chunk)

            # Update buffer with remaining data
            self._buffer.seek(0)
            self._buffer.write(value[chunk_size:])
            self._buffer.truncate()

            # Get new chunk size for next iteration
            chunk_size = self._chunk_size_func()
            if chunk_size is None:
                # If chunk_size becomes None, return remaining buffer as one chunk
                remaining = self._buffer.getvalue()
                if remaining:
                    chunks.append(remaining)
                    self._buffer.seek(0)
                    self._buffer.truncate()
                break

        return chunks

    def flush(self) -> list[bytes]:
        value = self._buffer.getvalue()
        self._buffer.seek(0)
        self._buffer.truncate()
        return [value] if value else []

even made a test for itself lol

import asyncio
from httpx._models import Response
from httpx._content import ByteStream

class TestDynamicChunking:
    def test_aiter_bytes_dynamic_with_loaded_content(self):
        """Test aiter_bytes_dynamic with already loaded content."""
        content = b"0123456789" * 10  # 100 bytes
        response = Response(200, content=content)

        # Test with increasing chunk sizes
        chunk_sizes = [1, 2, 3, 4, 5]
        chunk_size_iter = iter(chunk_sizes)
        chunks = []

        async def get_chunks():
            nonlocal chunks
            async for chunk in response.aiter_bytes_dynamic(lambda: next(chunk_size_iter, None)):
                chunks.append(chunk)

        asyncio.run(get_chunks())

        # We should get chunks of sizes 1, 2, 3, 4, 5, and then the rest (85 bytes)
        assert len(chunks) == len(chunk_sizes) + 1
        assert chunks[0] == content[:1]
        assert chunks[1] == content[1:3]
        assert chunks[2] == content[3:6]
        assert chunks[3] == content[6:10]
        assert chunks[4] == content[10:15]
        assert chunks[5] == content[15:]

    def test_aiter_bytes_dynamic_with_stream(self):
        """Test aiter_bytes_dynamic with streaming content."""
        content = b"0123456789" * 10  # 100 bytes
        stream = ByteStream(content)
        response = Response(200, stream=stream)

        # Test with alternating chunk sizes
        chunk_sizes = [5, 10, 15, 20]
        chunk_size_iter = iter(chunk_sizes)
        chunks = []

        async def get_chunks():
            nonlocal chunks
            async for chunk in response.aiter_bytes_dynamic(lambda: next(chunk_size_iter, None)):
                chunks.append(chunk)

        asyncio.run(get_chunks())

        # Verify the chunks
        expected_chunks = []
        pos = 0
        for size in chunk_sizes:
            expected_chunks.append(content[pos:pos+size])
            pos += size
        if pos < len(content):
            expected_chunks.append(content[pos:])

        assert chunks == expected_chunks

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also looks like you can make that an external helper via using Response.content, Response.request and unfortunately the private function _get_content_decoder. Otherwise may need to create a custom transport to have a subclass of the Response class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this important enough that we want to maintain a helper of this complexity? As opposed to instructing people to use the streaming tools built into httpx

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to ability to swap between aiohttp/httpx with minimal changes to avoid friction. Like at work we'll probably do this for http/2 support. Unfortunately/fortunately we started with aiohttp so we're with that set of defaults. I just went through this exercise at my work and ended up making part of this work, here's what I came up with. We'll need tighter version lock to httpx though due to use of private attribute, probably patch version, I mean technically for aiohttp we should really be doing that too. Actually we can potentially only make the read w/ content length go through this mechanism that needs the private ref so the normal way would be safer. Perhaps we open request for httpx to natively support this so later we can drop it. Thoughts?

class AIOHTTPResponseAdapter(wrapt.ObjectProxy):
    def __init__(self, obj: aiohttp.ClientResponse | httpx.Response):
        super().__init__(obj)
        self.__is_httpx = hasattr(obj, 'status_code')

    async def text(self):
        if self.__is_httpx:
            await self.__wrapped__.aread()
            return self.__wrapped__.text

        return await self.__wrapped__.text()

    @property
    def reason(self):
        if self.__is_httpx:
            return self.__wrapped__.reason_phrase

        return self.__wrapped__.reason

    @property
    def status(self):
        if self.__is_httpx:
            return self.__wrapped__.status_code

        return self.__wrapped__.status

    async def json(self,  **kwargs):
        if self.__is_httpx:
            await self.__wrapped__.aread()
            return self.__wrapped__.json(**kwargs)

        return await self.__wrapped__.json(**kwargs)

    async def read(self):
        if self.__is_httpx:
            return await self.__wrapped__.aread()  # technically we implement this

        return self.__wrapped__.read()

    @property
    def request_info(self):
        if self.__is_httpx:
            request: httpx.Request = self.__wrapped__.request
            return  aiohttp.RequestInfo(request.url, request.method, request.headers, request.url)

        return self.__wrapped__.request_info

    def wrapped_obj(self):
        return self.__wrapped__

class SingleEnterCtx(wrapt.ObjectProxy):
    def __init__(self, obj):
        super().__init__(obj)
        self.__response = None

    async def __aenter__(self):
        if not self.__response:
            self.__response = AIOHTTPResponseAdapter(await self.__wrapped__.__aenter__())

        return self.__response

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        return await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb)

and then used by wrapping:

return SingleEnterCtx(self._http_client.stream(
            method,
            self._base_url.join(api),
            data=data,
            auth=auth,
            headers=headers,
            **kwargs
        ))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to ability to swap between aiohttp/httpx with minimal changes to avoid friction. Like at work we'll probably do this for http/2 support. Unfortunately/fortunately we started with aiohttp so we're with that set of defaults

Of course we want the transition from aiohttp to httpx be as smooth as possible, but ideally the httpx backend would have feature parity such that nobody would need to go back to the aiohttp backend once they have transitioned the code. Here the transition would mean using the underlying httpx stream tools and needing to specify the chunk size up front. But if dynamic chunk sizing is required for some usage and widely used then I suppose we don't have an option.

Comment on lines +133 to +134
async def aclose(self):
return self.__wrapped__.close()
Copy link
Collaborator

@thehesiod thehesiod May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I'm torn about this. If we do expose this it should also await self.__wrapped__.wait_for_close(). Ideally people should always use the context manager for this close, ex:

async with response['Body'] as stream:

I suppose adding aclose won't hurt, however better fix I think is raising an error on close method for httpx and telling people to use the context manager

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you have a aclose you could get state issues, like if w/in the context manager you create a task for aclose and then exit the context. httpx would have to do locking to make sure "bad things" don't happen :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This addition is adding aclose to the aiohttp StreamingBody, httpx.Response already defines aclose (but not close), in order to be able to not have to have a conditional in the tests (and make transitioning

Given that httpx already supports aclose I'm a bit vary of banning it, but I def agree that context manager is the way to do it.

The other way of getting the tests to not look awful is to change the 8 tests in test_basic_s3 to all use the context manager form instead. (and add a dedicated one to test close+aclose.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it's a tough call, aiobotocore is trying to make the interface match botocore as much as possible, which has close and not aclose, that's why also for aiohttp we didn't create an aclose. On aiohttp's part the fact they made the close method sync was very questionable. We should have exposed an async close that called their sync close method, that's our bad, not sure who's fault that was :) I still think the real issue here is the httpx interface. They should have had a base response class, and the sync version had a sync close, and the async version had an async close. I don't think we should propagate their design decisions into our interfaces. It sounds like they want to support sync and async versions of their interfaces at the same time which we do not want to do. That would mean all our other interfaces should support sync versions as well, no thank you ;)

Actually I have a better idea, I think. How about we major rev aiobotocore and mark this issue as a breaking change such that we're going to expose the close method as an async method and not a sync method anymore? Given that python now warns you about non-awaited coroutines they'll get a warning if they do it incorrectly after a major rev.

thoughts? what do you think @jakob-keller, @terricain since this may affect aioboto3 if it exposes close?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[...] expose the close method as an async method and not a sync method anymore?

I couldn't find StreamingBody.close() anywhere. Does that even exist at the moment? The way I understand, it is meant to be used purely as an asynchronous context manager. No need for close nor aclose. Any clean up should be performed in __aexit__() directly or indirectly by delegating to the wrapped object.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakob-keller it's because we use a wrapt.ObjectProxy around the ClientResponse so we expose all the properties/method it exposes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakob-keller I do like that idea of removing the method altogether. The weird thing is that you get it w/o having to aenter it, so having a close on it seems ok

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plus to have parallelism with botocore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The convention in trio is that close is sync and aclose is async, and there's a decent amount of classes that support both when close is a trivial operation that only sets an internal flag or something.

It is quite confusing that StreamingBody/HttpxStreamingBody does not indicate what the actual API is, might be worth adding some comments there. But the fact that we're doing a thin wrapper around a library-specific object makes it quite tricky to be library-agnostic and also try to adhere to botocore API. It's possible we might want to step away from that and define the API more strictly, although also leaving the possibility of accessing the underlying raw stream fairly easily when people want to step away from that.

Comment on lines +273 to +282
if httpx and isinstance(body, httpx.Response):
# httpx does not support `.aread(1)`
byte_iterator = body.aiter_raw(1)
chunk1 = await byte_iterator.__anext__()
chunk2 = b""
async for b in byte_iterator:
chunk2 += b
else:
chunk1 = await body.read(1)
chunk2 = await body.read()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does, but you can't use aread, you need to use the other methods I mentioned in another file

@thehesiod
Copy link
Collaborator

ok finished first review iteration, sorry for delay! really appreciate all this work, we're getting there!

@jakkdl
Copy link
Contributor Author

jakkdl commented May 9, 2025

a bunch of tests in test_basic_s3 broke from upstream changes (idk if PRs in here or in botocore or something else), will have to figure them out

doesn't look like I managed to harmonize the CI changes either, ugh

response.headers.items()
)

http_response = aiobotocore.awsrequest.HttpxAWSResponse(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw does this work with SigV4 signing? I'm having a few issues getting this to work on our system due differences in signature calculation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the path, we should probably have another set of tests that does http/2...however not sure if moto supports this. Also should have a config param to enable this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok figured out my issue, it's critical when using http/2 w/ httpx to not have it pre-populate the Connection header when creating the Client. My fix:

        self._http_session = await self._exit_stack.enter_async_context(httpx.AsyncClient(http2=True))

        # with http/2 you don't want to have Connection in the headers or it breaks signing when later h2 strips out this header
        del self._http_session.headers['Connection']

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with this, so would be great if you could write a test for it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is actually action for you, it would be another global httpx test case scenario where we enable http2 with httpx via a new config variable

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I forgot moto doesn't support http/2 yet since it uses werkzeug, I just created an issue to see if they'll add support: werkzeug however a work-around is perhaps adding a http2 in-proc proxy.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked chatgpt here's what it recommended after a little prodding, it's not great but a good starting off point.

import ssl
import tempfile
import subprocess
import time
import pytest

from moto.server import ThreadedMotoServer

@pytest.fixture(scope="session")
def shared_self_signed_cert():
    cert_dir = tempfile.TemporaryDirectory()
    cert_path = os.path.join(cert_dir.name, "cert.pem")
    key_path = os.path.join(cert_dir.name, "key.pem")

    subprocess.run([
        "openssl", "req", "-x509", "-newkey", "rsa:2048",
        "-keyout", key_path, "-out", cert_path,
        "-days", "1", "-nodes", "-subj", "/CN=localhost"
    ], check=True)

    yield cert_path, key_path

    cert_dir.cleanup()


@pytest.fixture
def isolated_moto_nghttpx(shared_self_signed_cert):
    cert_path, key_path = shared_self_signed_cert

    # Start Moto on dynamic port
    moto_server = ThreadedMotoServer()
    moto_server.start()

    moto_port = moto_server.port

    # Choose proxy port (could also use port=0 if launching via socket discovery)
    proxy_port = moto_port + 10000

    nghttpx_proc = subprocess.Popen([
        "nghttpx",
        f"-f127.0.0.1,{proxy_port};proto=h2",
        f"-b127.0.0.1,{moto_port}",
        "--tls-cert=" + cert_path,
        "--tls-key=" + key_path,
        "--no-ocsp"
    ])
    time.sleep(0.5)

    yield f"https://localhost:{proxy_port}"

    nghttpx_proc.terminate()
    moto_server.stop()

according to chatgpt the ubuntu runners already have it available, so just need to add:

- name: Install nghttpx
        run: |
          sudo apt-get update
          sudo apt-get install -y nghttp2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't get the above to work, it fails to connect. I also tried with nginx but with similar problems. But I'm also not sure where to apply your 'Connection' fix.
But we could maybe leave this for a followup PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants