Skip to content

Bug: asyncio calls fail due to synchronous, connection-pooled HTTP clients in underlying libraries #83

@noobgramming

Description

@noobgramming

Hello,

We are encountering intermittent requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response')) errors when using the langchain-ibm integration with LangGraph's asyncio interfaces (e.g., ainvoke).

The core issue appears to be that the underlying HTTP client libraries used for API calls and authentication are not fully compatible with a asyncio environment, due to use of connection pooling.

Example of failing code:

# This call is unreliable in a concurrent asyncio environment
result = await (prompt | ibm_llm | StrOutputParser()).ainvoke({})

We have identified two distinct problems stemming from use of connection pooling


1. httpx Client for API Calls

The httpx client used for the primary API requests can lead to connection errors when its default connection pooling is active.

We worked around this issue by providing a custom httpx.AsyncClient that disables connection pooling by setting the Connection: close header. This forces a new connection for each request, avoiding state conflicts between concurrent tasks.

Workaround:

import httpx
from ibm_watsonx_ai.client import APIClient
from ibm_watsonx_ai.credentials import Credentials
from langchain_ibm import ChatWatsonx

# Custom client uses "Connection: close" header to avoid connection pool issues
custom_async_client = httpx.AsyncClient(
    headers={"Connection": "close"},
    timeout=1200
)

# Initialize Watsonx APIClient with the custom httpx client
watsonx_client = APIClient(
    credentials=Credentials(url=config["WATSONX_BASE_URL"], api_key=config["WATSONX_API_KEY"]),
    project_id=config["WATSONX_PROJECT_ID"],
    async_httpx_client=custom_async_client,
)

# Instantiate the ChatWatsonx model
chat_model = ChatWatsonx(
    model_id=model_info.name,
    project_id=config["WATSONX_PROJECT_ID"],
    apikey=config["WATSONX_API_KEY"],
    url=config["WATSONX_BASE_URL"],
    watsonx_client=watsonx_client,
)

2. requests Client for Authentication (Blocking Issue)

While the first issue can be worked around, the second one is blocking us. The authentication token management is handled by the ibm-watsonx-ai package (https://pypi.org/project/ibm-watsonx-ai/).

Specifically, the APIClient class in client.py uses the synchronous requests library to fetch and refresh OIDC tokens. Because requests uses a connection pool that is not async-safe, concurrent ainvoke calls that trigger a token refresh will conflict with each other. This results in the RemoteDisconnected error shown in the stack trace below.

Since this logic is internal to the ibm-watsonx-ai library's authentication methods, we were not able to implement a workaround.

Stack Trace

The following stack trace points to the requests call within the IAM authentication flow as the source of the error.

Click to expand full stack trace
requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
Traceback (most recent call last):
  File ".../vendor/urllib3/connectionpool.py", line 787, in urlopen
    response = self._make_request(
               ^^^^^^^^^^^^^^^^^^^
  File ".../vendor/urllib3/connectionpool.py", line 534, in _make_request
    response = conn.getresponse()
               ^^^^^^^^^^^^^^^^^^
  File ".../vendor/urllib3/connection.py", line 516, in getresponse
    httplib_response = super().getresponse()
                       ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/http/client.py", line 1428, in getresponse
    response.begin()
  File "/usr/lib/python3.12/http/client.py", line 331, in begin
    version, status, reason = self._read_status()
                              ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/http/client.py", line 300, in _read_status
    raise RemoteDisconnected("Remote end closed connection without"
http.client.RemoteDisconnected: Remote end closed connection without response

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".../vendor/requests/adapters.py", line 589, in send
    resp = conn.urlopen(
           ^^^^^^^^^^^^^
  File ".../vendor/urllib3/connectionpool.py", line 841, in urlopen
    retries = retries.increment(
              ^^^^^^^^^^^^^^^^^^
  File ".../vendor/urllib3/util/retry.py", line 474, in increment
    raise reraise(type(error), error, _stacktrace)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/urllib3/util/util.py", line 38, in reraise
    raise value.with_traceback(tb)
  File ".../vendor/urllib3/connectionpool.py", line 787, in urlopen
    response = self._make_request(
               ^^^^^^^^^^^^^^^^^^^
  File ".../vendor/urllib3/connectionpool.py", line 534, in _make_request
    response = conn.getresponse()
               ^^^^^^^^^^^^^^^^^^
  File ".../vendor/urllib3/connection.py", line 516, in getresponse
    httplib_response = super().getresponse()
                       ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/http/client.py", line 1428, in getresponse
    response.begin()
  File "/usr/lib/python3.12/http/client.py", line 331, in begin
    version, status, reason = self._read_status()
                              ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/http/client.py", line 300, in _read_status
    raise RemoteDisconnected("Remote end closed connection without"
urllib3.exceptions.ProtocolError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".../vendor/langgraph/utils/runnable.py", line 440, in ainvoke
    ret = await self.afunc(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../app/api/internal_module/file.py", line 112, in _internal_request_handler
    res: str = await chain.ainvoke({}, config={"callbacks": [...]})
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/langchain_core/runnables/base.py", line 3089, in ainvoke
    input_ = await coro_with_context(part(), context, create_task=True)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/langchain_core/language_models/chat_models.py", line 394, in ainvoke
    llm_result = await self.agenerate_prompt(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/langchain_core/language_models/chat_models.py", line 968, in agenerate_prompt
    return await self.agenerate(
           ^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/langchain_core/language_models/chat_models.py", line 926, in agenerate
    raise exceptions[0]
  File ".../vendor/langchain_core/language_models/chat_models.py", line 1094, in _agenerate_with_cache
    result = await self._agenerate(
             ^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/langchain_ibm/chat_models.py", line 742, in _agenerate
    response = await self.watsonx_model.achat(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/ibm_watsonx_ai/foundation_models/inference/model_inference.py", line 481, in achat
    return await self._inference.achat(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/ibm_watsonx_ai/foundation_models/inference/fm_model_inference.py", line 203, in achat
    headers=self._client._get_headers(),
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/ibm_watsonx_ai/client.py", line 818, in _get_headers
    else self.token
         ^^^^^^^^^^
  File ".../vendor/ibm_watsonx_ai/client.py", line 678, in token
    return self._auth_method.get_token()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/ibm_watsonx_ai/utils/auth/base_auth.py", line 112, in get_token
    self._save_token_data(self._refresh_token())
                          ^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/ibm_watsonx_ai/utils/auth/base_auth.py", line 135, in _refresh_token
    return self._generate_token()
           ^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/ibm_watsonx_ai/utils/auth/iam_auth.py", line 58, in _generate_token
    response = self._session.post(
               ^^^^^^^^^^^^^^^^^^^
  File ".../vendor/requests/sessions.py", line 637, in post
    return self.request("POST", url, data=data, json=json, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/ibm_watsonx_ai/_wrappers/requests.py", line 98, in wrapper
    raise e
  File ".../vendor/ibm_watsonx_ai/_wrappers/requests.py", line 84, in wrapper
    res = func(*args, **kw)
          ^^^^^^^^^^^^^^^^^
  File ".../vendor/ibm_watsonx_ai/_wrappers/requests.py", line 121, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/ibm_watsonx_ai/_wrappers/requests.py", line 325, in request
    return super().request(**{**kwargs, **{"data": data}})
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/requests/sessions.py", line 589, in request
    resp = self.send(prep, **send_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/requests/sessions.py", line 703, in send
    r = adapter.send(request, **kwargs)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../vendor/requests/adapters.py", line 604, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))

Proposed Solution

  1. The authentication mechanism within ibm-watsonx-ai library needs to be updated. The token retrieval and refresh logic in APIClient should use an async-native HTTP client (e.g., httpx.AsyncClient or aiohttp) instead of requests when being used in an asynchronous context.
  2. The default LLM inferenc client httpx should be updated to detect/support when it's used in async context and use an async compatible client instead of the default synchronous one.

Recommended Workaround

Until the underlying ibm-watsonx-ai library is updated, we recommend including documentation that langchain-ibm only supports synchronous LangChain interfaces. to_thread() on LangChain synchronous calls offers a reasonably performant alternative for now.

Example:

result = await asyncio.to_thread(prompt | ibm_llm | StrOutputParser()).invoke({}))

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions