-
Notifications
You must be signed in to change notification settings - Fork 19
Description
Hello,
We are encountering intermittent requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
errors when using the langchain-ibm
integration with LangGraph
's asyncio
interfaces (e.g., ainvoke
).
The core issue appears to be that the underlying HTTP client libraries used for API calls and authentication are not fully compatible with a asyncio
environment, due to use of connection pooling.
Example of failing code:
# This call is unreliable in a concurrent asyncio environment
result = await (prompt | ibm_llm | StrOutputParser()).ainvoke({})
We have identified two distinct problems stemming from use of connection pooling
1. httpx
Client for API Calls
The httpx
client used for the primary API requests can lead to connection errors when its default connection pooling is active.
We worked around this issue by providing a custom httpx.AsyncClient
that disables connection pooling by setting the Connection: close
header. This forces a new connection for each request, avoiding state conflicts between concurrent tasks.
Workaround:
import httpx
from ibm_watsonx_ai.client import APIClient
from ibm_watsonx_ai.credentials import Credentials
from langchain_ibm import ChatWatsonx
# Custom client uses "Connection: close" header to avoid connection pool issues
custom_async_client = httpx.AsyncClient(
headers={"Connection": "close"},
timeout=1200
)
# Initialize Watsonx APIClient with the custom httpx client
watsonx_client = APIClient(
credentials=Credentials(url=config["WATSONX_BASE_URL"], api_key=config["WATSONX_API_KEY"]),
project_id=config["WATSONX_PROJECT_ID"],
async_httpx_client=custom_async_client,
)
# Instantiate the ChatWatsonx model
chat_model = ChatWatsonx(
model_id=model_info.name,
project_id=config["WATSONX_PROJECT_ID"],
apikey=config["WATSONX_API_KEY"],
url=config["WATSONX_BASE_URL"],
watsonx_client=watsonx_client,
)
2. requests
Client for Authentication (Blocking Issue)
While the first issue can be worked around, the second one is blocking us. The authentication token management is handled by the ibm-watsonx-ai
package (https://pypi.org/project/ibm-watsonx-ai/
).
Specifically, the APIClient
class in client.py
uses the synchronous requests
library to fetch and refresh OIDC tokens. Because requests
uses a connection pool that is not async-safe, concurrent ainvoke
calls that trigger a token refresh will conflict with each other. This results in the RemoteDisconnected
error shown in the stack trace below.
Since this logic is internal to the ibm-watsonx-ai
library's authentication methods, we were not able to implement a workaround.
Stack Trace
The following stack trace points to the requests
call within the IAM authentication flow as the source of the error.
Click to expand full stack trace
requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
Traceback (most recent call last):
File ".../vendor/urllib3/connectionpool.py", line 787, in urlopen
response = self._make_request(
^^^^^^^^^^^^^^^^^^^
File ".../vendor/urllib3/connectionpool.py", line 534, in _make_request
response = conn.getresponse()
^^^^^^^^^^^^^^^^^^
File ".../vendor/urllib3/connection.py", line 516, in getresponse
httplib_response = super().getresponse()
^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/http/client.py", line 1428, in getresponse
response.begin()
File "/usr/lib/python3.12/http/client.py", line 331, in begin
version, status, reason = self._read_status()
^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/http/client.py", line 300, in _read_status
raise RemoteDisconnected("Remote end closed connection without"
http.client.RemoteDisconnected: Remote end closed connection without response
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File ".../vendor/requests/adapters.py", line 589, in send
resp = conn.urlopen(
^^^^^^^^^^^^^
File ".../vendor/urllib3/connectionpool.py", line 841, in urlopen
retries = retries.increment(
^^^^^^^^^^^^^^^^^^
File ".../vendor/urllib3/util/retry.py", line 474, in increment
raise reraise(type(error), error, _stacktrace)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/urllib3/util/util.py", line 38, in reraise
raise value.with_traceback(tb)
File ".../vendor/urllib3/connectionpool.py", line 787, in urlopen
response = self._make_request(
^^^^^^^^^^^^^^^^^^^
File ".../vendor/urllib3/connectionpool.py", line 534, in _make_request
response = conn.getresponse()
^^^^^^^^^^^^^^^^^^
File ".../vendor/urllib3/connection.py", line 516, in getresponse
httplib_response = super().getresponse()
^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/http/client.py", line 1428, in getresponse
response.begin()
File "/usr/lib/python3.12/http/client.py", line 331, in begin
version, status, reason = self._read_status()
^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/http/client.py", line 300, in _read_status
raise RemoteDisconnected("Remote end closed connection without"
urllib3.exceptions.ProtocolError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File ".../vendor/langgraph/utils/runnable.py", line 440, in ainvoke
ret = await self.afunc(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../app/api/internal_module/file.py", line 112, in _internal_request_handler
res: str = await chain.ainvoke({}, config={"callbacks": [...]})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/langchain_core/runnables/base.py", line 3089, in ainvoke
input_ = await coro_with_context(part(), context, create_task=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/langchain_core/language_models/chat_models.py", line 394, in ainvoke
llm_result = await self.agenerate_prompt(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/langchain_core/language_models/chat_models.py", line 968, in agenerate_prompt
return await self.agenerate(
^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/langchain_core/language_models/chat_models.py", line 926, in agenerate
raise exceptions[0]
File ".../vendor/langchain_core/language_models/chat_models.py", line 1094, in _agenerate_with_cache
result = await self._agenerate(
^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/langchain_ibm/chat_models.py", line 742, in _agenerate
response = await self.watsonx_model.achat(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/ibm_watsonx_ai/foundation_models/inference/model_inference.py", line 481, in achat
return await self._inference.achat(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/ibm_watsonx_ai/foundation_models/inference/fm_model_inference.py", line 203, in achat
headers=self._client._get_headers(),
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/ibm_watsonx_ai/client.py", line 818, in _get_headers
else self.token
^^^^^^^^^^
File ".../vendor/ibm_watsonx_ai/client.py", line 678, in token
return self._auth_method.get_token()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/ibm_watsonx_ai/utils/auth/base_auth.py", line 112, in get_token
self._save_token_data(self._refresh_token())
^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/ibm_watsonx_ai/utils/auth/base_auth.py", line 135, in _refresh_token
return self._generate_token()
^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/ibm_watsonx_ai/utils/auth/iam_auth.py", line 58, in _generate_token
response = self._session.post(
^^^^^^^^^^^^^^^^^^^
File ".../vendor/requests/sessions.py", line 637, in post
return self.request("POST", url, data=data, json=json, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/ibm_watsonx_ai/_wrappers/requests.py", line 98, in wrapper
raise e
File ".../vendor/ibm_watsonx_ai/_wrappers/requests.py", line 84, in wrapper
res = func(*args, **kw)
^^^^^^^^^^^^^^^^^
File ".../vendor/ibm_watsonx_ai/_wrappers/requests.py", line 121, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/ibm_watsonx_ai/_wrappers/requests.py", line 325, in request
return super().request(**{**kwargs, **{"data": data}})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/requests/sessions.py", line 589, in request
resp = self.send(prep, **send_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/requests/sessions.py", line 703, in send
r = adapter.send(request, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File ".../vendor/requests/adapters.py", line 604, in send
raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
Proposed Solution
- The authentication mechanism within
ibm-watsonx-ai
library needs to be updated. The token retrieval and refresh logic inAPIClient
should use an async-native HTTP client (e.g.,httpx.AsyncClient
oraiohttp
) instead ofrequests
when being used in an asynchronous context. - The default LLM inferenc client
httpx
should be updated to detect/support when it's used in async context and use an async compatible client instead of the default synchronous one.
Recommended Workaround
Until the underlying ibm-watsonx-ai
library is updated, we recommend including documentation that langchain-ibm
only supports synchronous LangChain interfaces. to_thread()
on LangChain synchronous calls offers a reasonably performant alternative for now.
Example:
result = await asyncio.to_thread(prompt | ibm_llm | StrOutputParser()).invoke({}))