Replies: 1 comment
-
I found some similar discussions that might be helpful:
To incorporate your Python_Pandas agent into a LangGraph workflow, you can follow the example provided below: import os
import getpass
import pandas as pd
from langchain_azure_dynamic_sessions import SessionsPythonREPLTool
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import AIMessage, BaseMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import RunnablePassthrough
from langchain_core.tools import tool
from langchain_openai import AzureChatOpenAI
from langgraph.graph import END, StateGraph
from langgraph.prebuilt import ToolNode
from matplotlib.pyplot import imshow
from PIL import Image
import io
import base64
import json
import operator
from typing import Annotated, List, Literal, Optional, Sequence, TypedDict
# Set credentials
os.environ["AZURE_OPENAI_API_KEY"] = getpass.getpass("Azure OpenAI API key")
os.environ["AZURE_OPENAI_ENDPOINT"] = getpass.getpass("Azure OpenAI endpoint")
AZURE_OPENAI_DEPLOYMENT_NAME = getpass.getpass("Azure OpenAI deployment name")
SESSIONS_POOL_MANAGEMENT_ENDPOINT = getpass.getpass("Azure Container Apps dynamic sessions pool management endpoint")
SQL_DB_CONNECTION_STRING = getpass.getpass("PostgreSQL connection string")
# Instantiate model, DB, code interpreter
db = SQLDatabase.from_uri(SQL_DB_CONNECTION_STRING)
llm = AzureChatOpenAI(deployment_name=AZURE_OPENAI_DEPLOYMENT_NAME, openai_api_version="2024-02-01")
repl = SessionsPythonREPLTool(pool_management_endpoint=SESSIONS_POOL_MANAGEMENT_ENDPOINT)
# Define State
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
class RawToolMessage(ToolMessage):
raw: dict
tool_name: str
# Define Nodes
class create_df_from_sql(BaseModel):
select_query: str = Field(..., description="A PostgreSQL SELECT statement.")
df_columns: List[str] = Field(..., description="Ordered names to give the DataFrame columns.")
df_name: str = Field(..., description="The name to give the DataFrame variable in downstream code.")
class python_shell(BaseModel):
code: str = Field(..., description="The code to execute. Make sure to print any important results.")
system_prompt = f"""\
You are an expert at PostgreSQL and Python. You have access to a PostgreSQL database \
with the following tables
{db.table_info}
Given a user question related to the data in the database, \
first get the relevant data from the table as a DataFrame using the create_df_from_sql tool. Then use the \
python_shell to do any analysis required to answer the user question."""
prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
("placeholder", "{messages}"),
]
)
def call_model(state: AgentState) -> dict:
messages = []
chain = prompt | llm.bind_tools([create_df_from_sql, python_shell])
messages.append(chain.invoke({"messages": state["messages"]}))
return {"messages": messages}
def execute_sql_query(state: AgentState) -> dict:
messages = []
for tool_call in state["messages"][-1].tool_calls:
if tool_call["name"] != "create_df_from_sql":
continue
res = db.run(tool_call["args"]["select_query"], fetch="cursor").fetchall()
df_columns = tool_call["args"]["df_columns"]
df = pd.DataFrame(res, columns=df_columns)
df_name = tool_call["args"]["df_name"]
messages.append(
RawToolMessage(
f"Generated dataframe {df_name} with columns {df_columns}",
raw={df_name: df},
tool_call_id=tool_call["id"],
tool_name=tool_call["name"],
)
)
return {"messages": messages}
def _upload_dfs_to_repl(state: AgentState) -> str:
df_dicts = [
msg.raw
for msg in state["messages"]
if isinstance(msg, RawToolMessage) and msg.tool_name == "create_df_from_sql"
]
name_df_map = {name: df for df_dict in df_dicts for name, df in df_dict.items()}
for name, df in name_df_map.items():
buffer = io.StringIO()
df.to_csv(buffer)
buffer.seek(0)
repl.upload_file(data=buffer, remote_file_path=name + ".csv")
df_code = "import pandas as pd\n" + "\n.join(
f"{name} = pd.read_csv('/mnt/data/{name}.csv')" for name in name_df_map
)
return df_code
def _repl_result_to_msg_content(repl_result: dict) -> str:
content = {}
for k, v in repl_result.items():
if isinstance(repl_result[k], dict) and repl_result[k]["type"] == "image":
base64_str = repl_result[k]["base64_data"]
img = Image.open(io.BytesIO(base64.decodebytes(bytes(base64_str, "utf-8"))))
display(img)
else:
content[k] = repl_result[k]
return json.dumps(content, indent=2)
def execute_python(state: AgentState) -> dict:
messages = []
df_code = _upload_dfs_to_repl(state)
last_ai_msg = [msg for msg in state["messages"] if isinstance(msg, AIMessage)][-1]
for tool_call in last_ai_msg.tool_calls:
if tool_call["name"] != "python_shell":
continue
generated_code = tool_call["args"]["code"]
repl_result = repl.execute(df_code + "\n" + generated_code)
messages.append(
RawToolMessage(
_repl_result_to_msg_content(repl_result),
raw=repl_result,
tool_call_id=tool_call["id"],
tool_name=tool_call["name"],
)
)
return {"messages": messages}
# Define Edges
def should_continue(state: AgentState) -> str:
return "execute_sql_query" if state["messages"][-1].tool_calls else END
workflow = StateGraph(AgentState)
workflow.add_node("call_model", call_model)
workflow.add_node("execute_sql_query", execute_sql_query)
workflow.add_node("execute_python", execute_python)
workflow.set_entry_point("call_model")
workflow.add_edge("execute_sql_query", "execute_python")
workflow.add_edge("execute_python", "call_model")
workflow.add_conditional_edges("call_model", should_continue)
app = workflow.compile()
print(app.get_graph().draw_ascii())
# Test it out
output = app.invoke({"messages": [("human", "graph the average latency by model")]})
print(output["messages"][-1].content) This code sets up a LangGraph workflow that integrates a Python_Pandas agent to query a PostgreSQL database and execute Python code for data analysis. Make sure to install the necessary packages before running the code: ! pip install langchain-chroma langchain_community tiktoken langchain-openai langchainhub langchain langgraph For more advanced data analysis applications, you can refer to the LangChain documentation on how to build Q&A systems over CSV files and SQL databases. This includes using tools like Pandas for data manipulation and analysis, and creating agents that can iteratively execute code while maintaining the inputs and outputs of previous executions [4][5][6]. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Checked other resources
Commit to Help
Example Code
Description
I am very fascinated by the LangGraph architecture and am exploring it as a newbie. I am able to execute workflows and create pandas agents individually without any issues. However, my current task involves combining both to produce meaningful research. Is that possible? Could you provide a sample code to help me get started? I would greatly appreciate it.
System Info
Name: langchain
Version: 0.2.15
Name: langgraph
Version: 0.2.15
Beta Was this translation helpful? Give feedback.
All reactions