Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The following HTTP libraries are supported:
- ``urllib2``
- ``urllib3``
- ``httpx``
- ``httpcore``

Speed
-----
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def find_version(*file_paths):
"tests": [
"aiohttp",
"boto3",
"httpcore",
"httplib2",
"httpx",
"pytest-aiohttp",
Expand Down
25 changes: 14 additions & 11 deletions vcr/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@


try:
import httpx
import httpcore
except ImportError: # pragma: no cover
pass
else:
_HttpxSyncClient_send_single_request = httpx.Client._send_single_request
_HttpxAsyncClient_send_single_request = httpx.AsyncClient._send_single_request
_HttpcoreConnectionPool_handle_request = httpcore.ConnectionPool.handle_request
_HttpcoreAsyncConnectionPool_handle_async_request = httpcore.AsyncConnectionPool.handle_async_request


class CassettePatcherBuilder:
Expand All @@ -121,7 +121,7 @@ def build(self):
self._httplib2(),
self._tornado(),
self._aiohttp(),
self._httpx(),
self._httpcore(),
self._build_patchers_from_mock_triples(self._cassette.custom_patches),
)

Expand Down Expand Up @@ -304,19 +304,22 @@ def _aiohttp(self):
yield client.ClientSession, "_request", new_request

@_build_patchers_from_mock_triples_decorator
def _httpx(self):
def _httpcore(self):
try:
import httpx
import httpcore
except ImportError: # pragma: no cover
return
else:
from .stubs.httpx_stubs import async_vcr_send, sync_vcr_send
from .stubs.httpcore_stubs import vcr_handle_async_request, vcr_handle_request

new_async_client_send = async_vcr_send(self._cassette, _HttpxAsyncClient_send_single_request)
yield httpx.AsyncClient, "_send_single_request", new_async_client_send
new_handle_async_request = vcr_handle_async_request(
self._cassette,
_HttpcoreAsyncConnectionPool_handle_async_request,
)
yield httpcore.AsyncConnectionPool, "handle_async_request", new_handle_async_request

new_sync_client_send = sync_vcr_send(self._cassette, _HttpxSyncClient_send_single_request)
yield httpx.Client, "_send_single_request", new_sync_client_send
new_handle_request = vcr_handle_request(self._cassette, _HttpcoreConnectionPool_handle_request)
yield httpcore.ConnectionPool, "handle_request", new_handle_request

def _urllib3_patchers(self, cpool, conn, stubs):
http_connection_remover = ConnectionRemover(
Expand Down
215 changes: 215 additions & 0 deletions vcr/stubs/httpcore_stubs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import asyncio
import functools
import logging
from collections import defaultdict
from collections.abc import AsyncIterable, Iterable

from httpcore import Response
from httpcore._models import ByteStream

from vcr.errors import CannotOverwriteExistingCassetteException
from vcr.filters import decode_response
from vcr.request import Request as VcrRequest
from vcr.serializers.compat import convert_body_to_bytes

_logger = logging.getLogger(__name__)


async def _convert_byte_stream(stream):
if isinstance(stream, Iterable):
return list(stream)

if isinstance(stream, AsyncIterable):
return [part async for part in stream]

raise TypeError(
f"_convert_byte_stream: stream must be Iterable or AsyncIterable, got {type(stream).__name__}",
)


def _serialize_headers(real_response):
"""
Some headers can appear multiple times, like "Set-Cookie".
Therefore serialize every header key to a list of values.
"""

headers = defaultdict(list)

for name, value in real_response.headers:
headers[name.decode("ascii")].append(value.decode("ascii"))

return dict(headers)


async def _serialize_response(real_response):
# The reason_phrase may not exist
try:
reason_phrase = real_response.extensions["reason_phrase"].decode("ascii")
except KeyError:
reason_phrase = None

# Reading the response stream consumes the iterator, so we need to restore it afterwards
content = b"".join(await _convert_byte_stream(real_response.stream))
real_response.stream = ByteStream(content)

return {
"status": {"code": real_response.status, "message": reason_phrase},
"headers": _serialize_headers(real_response),
"body": {"string": content},
}


def _deserialize_headers(headers):
"""
httpcore accepts headers as list of tuples of header key and value.
"""

return [
(name.encode("ascii"), value.encode("ascii")) for name, values in headers.items() for value in values
]


def _deserialize_response(vcr_response):
# Cassette format generated for HTTPX requests by older versions of
# vcrpy. We restructure the content to resemble what a regular
# cassette looks like.
if "status_code" in vcr_response:
vcr_response = decode_response(
convert_body_to_bytes(
{
"headers": vcr_response["headers"],
"body": {"string": vcr_response["content"]},
"status": {"code": vcr_response["status_code"]},
},
),
)
extensions = None
else:
extensions = (
{"reason_phrase": vcr_response["status"]["message"].encode("ascii")}
if vcr_response["status"]["message"]
else None
)

return Response(
vcr_response["status"]["code"],
headers=_deserialize_headers(vcr_response["headers"]),
content=vcr_response["body"]["string"],
extensions=extensions,
)


async def _make_vcr_request(real_request):
# Reading the request stream consumes the iterator, so we need to restore it afterwards
body = b"".join(await _convert_byte_stream(real_request.stream))
real_request.stream = ByteStream(body)

uri = bytes(real_request.url).decode("ascii")

# As per HTTPX: If there are multiple headers with the same key, then we concatenate them with commas
headers = defaultdict(list)

for name, value in real_request.headers:
headers[name.decode("ascii")].append(value.decode("ascii"))

headers = {name: ", ".join(values) for name, values in headers.items()}

return VcrRequest(real_request.method.decode("ascii"), uri, body, headers)


async def _vcr_request(cassette, real_request):
vcr_request = await _make_vcr_request(real_request)

if cassette.can_play_response_for(vcr_request):
return vcr_request, _play_responses(cassette, vcr_request)

if cassette.write_protected and cassette.filter_request(vcr_request):
raise CannotOverwriteExistingCassetteException(
cassette=cassette,
failed_request=vcr_request,
)

_logger.info("%s not in cassette, sending to real server", vcr_request)

return vcr_request, None


async def _record_responses(cassette, vcr_request, real_response):
cassette.append(vcr_request, await _serialize_response(real_response))


def _play_responses(cassette, vcr_request):
vcr_response = cassette.play_response(vcr_request)
real_response = _deserialize_response(vcr_response)

return real_response


async def _vcr_handle_async_request(
cassette,
real_handle_async_request,
self,
real_request,
):
vcr_request, vcr_response = await _vcr_request(cassette, real_request)

if vcr_response:
return vcr_response

real_response = await real_handle_async_request(self, real_request)
await _record_responses(cassette, vcr_request, real_response)

return real_response


def vcr_handle_async_request(cassette, real_handle_async_request):
@functools.wraps(real_handle_async_request)
def _inner_handle_async_request(self, real_request):
return _vcr_handle_async_request(
cassette,
real_handle_async_request,
self,
real_request,
)

return _inner_handle_async_request


def _run_async_function(sync_func, *args, **kwargs):
"""
Safely run an asynchronous function from a synchronous context.
Handles both cases:
- An event loop is already running.
- No event loop exists yet.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(sync_func(*args, **kwargs))
else:
# If inside a running loop, create a task and wait for it
return asyncio.ensure_future(sync_func(*args, **kwargs))
Copy link

@landrybr landrybr Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seowalex This PR solves some problems with httpx support I'm dealing with. But it seems like there's a problem here caused by the future not being properly awaited. I get RuntimeError: await wasn't used with future.

This can be solved by running it in a separate thread with something like

def run_in_thread():
    return asyncio.run(sync_func(*args, **kwargs))

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(run_in_thread)
    return future.result()



def _vcr_handle_request(cassette, real_handle_request, self, real_request):
vcr_request, vcr_response = _run_async_function(
_vcr_request,
cassette,
real_request,
)

if vcr_response:
return vcr_response

real_response = real_handle_request(self, real_request)
_run_async_function(_record_responses, cassette, vcr_request, real_response)

return real_response


def vcr_handle_request(cassette, real_handle_request):
@functools.wraps(real_handle_request)
def _inner_handle_request(self, real_request):
return _vcr_handle_request(cassette, real_handle_request, self, real_request)

return _inner_handle_request
Loading
Loading