-
Notifications
You must be signed in to change notification settings - Fork 88
Description
NotImplementedError Traceback (most recent call last)
in <cell line: 0>()
101 }
102
--> 103 for output in graph.stream(initial_state):
104 print(f"\nStep decision: {output.get('next', 'N/A')}")
105 if output.get("messages"):
8 frames
/usr/local/lib/python3.11/dist-packages/langchain_core/messages/utils.py in _convert_to_message(message)
341 msg = f"Unsupported message type: {type(message)}"
342 msg = create_message(message=msg, error_code=ErrorCode.MESSAGE_COERCION_FAILURE)
--> 343 raise NotImplementedError(msg)
344
345 return _message
NotImplementedError: Unsupported message type: <class 'main.SupervisorDecision'>
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/MESSAGE_COERCION_FAILURE
my full code:
from typing import Literal
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START
from pydantic import BaseModel
from IPython.display import Image, display
class SupervisorDecision(BaseModel):
next: Literal["researcher", "coder", "FINISH"]
model = ChatOpenAI(model="openai/gpt-4o-mini",temperature=0,openai_api_base="https://openrouter.ai/api/v1")
model = model.with_structured_output(SupervisorDecision)
agents = ["researcher", "coder"]
system_prompt_part_1 = f"""You are a supervisor tasked with managing a conversation between the
following workers: {agents}. Given the following user request,
respond with the worker to act next. Each worker will perform a
task and respond with their results and status. When finished,
respond with FINISH."""
system_prompt_part_2 = f"""Given the conversation above, who should act next? Or should we FINISH? Select one of: {", ".join(agents)}, FINISH"""
def supervisor(state):
messages = [
("system", system_prompt_part_1),
*state["messages"],
("system", system_prompt_part_2),
]
# Get the supervisor's decision
decision = model.invoke(messages)
# Return a SystemMessage containing the decision
return {"messages": [SystemMessage(content=decision.next)] , "next": decision.next}
class AgentState(MessagesState):
next: Literal["researcher", "coder", "FINISH"]
def researcher(state: AgentState):
# In a real implementation, this would do research tasks
response = model.invoke(
[
{
"role": "system",
"content": "You are a research assistant. Analyze the request and provide relevant information.",
},
{"role": "user", "content": state["messages"][0].content},
]
)
return {"messages": [response]}
def coder(state: AgentState):
# In a real implementation, this would write code
response = model.invoke(
[
{
"role": "system",
"content": "You are a coding assistant. Implement the requested functionality.",
},
{"role": "user", "content": state["messages"][0].content},
]
)
return {"messages": [response]}
builder = StateGraph(AgentState)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("coder", coder)
builder.add_edge(START, "supervisor")
builder.add_conditional_edges("supervisor", lambda state: state["next"])
builder.add_edge("researcher", "supervisor")
builder.add_edge("coder", "supervisor")
graph = builder.compile()
image_bytes = graph.get_graph().draw_mermaid_png()
display(Image(data=image_bytes))
initial_state = {
"messages": [
{
"role": "user",
"content": "I need help analyzing some data and creating a visualization.",
}
],
"next": "supervisor",
}
for output in graph.stream(initial_state):
print(f"\nStep decision: {output.get('next', 'N/A')}")
if output.get("messages"):
print(f"Response: {output['messages'][-1].content[:100]}...")