Replies: 2 comments 4 replies
-
To use an asynchronous from typing import List, Sequence
from langchain_core.messages import BaseMessage
from langchain_core.runnables import run_in_executor
from langchain_core.pydantic_v1 import BaseModel, Field
class InMemoryChatMessageHistory(BaseChatMessageHistory, BaseModel):
messages: List[BaseMessage] = Field(default_factory=list)
async def aget_messages(self) -> List[BaseMessage]:
return self.messages
def add_message(self, message: BaseMessage) -> None:
self.messages.append(message)
async def aadd_messages(self, messages: Sequence[BaseMessage]) -> None:
await run_in_executor(None, self.add_messages, messages)
def clear(self) -> None:
self.messages = []
async def aclear(self) -> None:
await run_in_executor(None, self.clear) To manage the message history more effectively, you can use the from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
store = {}
def get_session_history(session_id: str) -> BaseChatMessageHistory:
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
with_message_history = RunnableWithMessageHistory(
runnable,
get_session_history,
input_messages_key="input",
history_messages_key="history",
)
with_message_history.invoke(
{"ability": "math", "input": "What does cosine mean?"},
config={"configurable": {"session_id": "abc123"}},
) Addressing Your Specific Issues
Here is an updated version of your example code with these considerations: import asyncio
import uuid
from pprint import pprint
import psycopg
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import ChatOpenAI
from langchain_postgres import PostgresChatMessageHistory
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You're an assistant who's good at {ability}. Respond in 20 words or fewer",
),
MessagesPlaceholder(variable_name="history"),
("human", "{input}"),
]
)
runnable = prompt | model
table_name = "chat_history"
async_connection = None
async def init_async_connection():
global async_connection
async_connection = await psycopg.AsyncConnection.connect(
user="postgres",
password="password_postgres",
host="localhost",
port=5432)
async def aget_session_history(session_id: str) -> BaseChatMessageHistory:
return PostgresChatMessageHistory(
table_name,
session_id,
async_connection=async_connection
)
awith_message_history = RunnableWithMessageHistory(
runnable,
aget_session_history,
input_messages_key="input",
history_messages_key="history",
)
async def amain():
await init_async_connection()
result = await awith_message_history.ainvoke(
{"ability": "math", "input": "What does cosine mean?"},
config={"configurable": {"session_id": str(uuid.uuid4())}},
)
pprint(result)
asyncio.run(amain()) This implementation ensures that the asynchronous methods (
|
Beta Was this translation helpful? Give feedback.
-
Hi @pprados , hope you are well. I am having the same issue with my project. Have you been able to find a solution yet? Thank you |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Checked other resources
Commit to Help
Example Code
Description
It's impossible to use and async ChatMessageHistory with langchain-core.
The
ChatMessageHistory
class is synchronous and doesn't have an async counterpart.This is a problem because the
RunnableWithMessageHistory
class requires aChatMessageHistory
object to be passed to it. This means that it's impossible to use an async ChatMessageHistory with langchain-core.I can't find any example of how to use it. I will try to create an example of how to use
PostgresChatMessageHistory
with async mode.There are many problems:
_exit_history()
PostgresChatMessageHistory
and sync usagePostgresChatMessageHistory
and async usageBug in
_exit_history()
In
RunnableWithMessageHistory
, the_exit_history()
is called because the chain has| runnable.with_listeners(on_end=self._exit_history)
. This method is not async and it will raise an error. This method calladd_messages()
and notawait aadd_messages()
.Result
Bugs in
PostgresChatMessageHistory
and sync usageIn
PostgresChatMessageHistory
, the design is problematic.Langchain, with LCEL, is declarative programming. You have to declare a chain in global variables, then invoke them when necessary. This is how langserv is able to publish interfaces with
add_route()
.For optimization reasons,
PostgresChatMessageHistory
wishes to recycle connections. The class provides a constructor which accepts async_connection
parameter. However, it is not possible to have a global connection, in order to reuse it when implementingget_session_history()
.A connection is not reentrant! You can't use the same connection in multiple threads. But, the design of langchain-postgres is to have a global connection. This is a problem.
The alternative is to create a new connection each time you need to access the database.
Then, why accept only a connection and not an engine? The engine is a connection pool.
Bugs in
PostgresChatMessageHistory
and async usageIf we ignore the problem mentioned above with
_exit_history()
, there are even more difficulties. It's not easy to initialize a global async connection. Because it's must be initialized in an async function.And, it's not possible to call
init_async_connection()
inget_session_history()
.get_session_history()
is not async. It's a problem.It is therefore currently impossible to implement session history correctly in asynchronous mode.
Either you use a global connection, but that's not possible, or you open the connection in ̀get_session_history()`, but that's impossible.
The only solution is to completely break the use of LCEL, by building the chain just after the connection is opened. It's still very strange. To publish it with langserv, you need to use a
RunnableLambda
.It's a very strange way to use langchain.
But a good use of langchain in a website consists precisely in using only asynchronous approaches. This must include history management.
What is the best solution?
System Info
langchain==0.1.20
langchain-community==0.0.38
langchain-core==0.2.1
langchain-openai==0.1.7
langchain_postgres==0.0.6
langchain-rag==0.1.46
langchain-text-splitters==0.0.1
Ubuntu
Beta Was this translation helpful? Give feedback.
All reactions