Skip to content

chapt 7:d-supervisor.py NotImplementedError, i need help #5

@xiongzhenglong

Description

@xiongzhenglong

NotImplementedError Traceback (most recent call last)
in <cell line: 0>()
101 }
102
--> 103 for output in graph.stream(initial_state):
104 print(f"\nStep decision: {output.get('next', 'N/A')}")
105 if output.get("messages"):

8 frames
/usr/local/lib/python3.11/dist-packages/langchain_core/messages/utils.py in _convert_to_message(message)
341 msg = f"Unsupported message type: {type(message)}"
342 msg = create_message(message=msg, error_code=ErrorCode.MESSAGE_COERCION_FAILURE)
--> 343 raise NotImplementedError(msg)
344
345 return _message

NotImplementedError: Unsupported message type: <class 'main.SupervisorDecision'>
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/MESSAGE_COERCION_FAILURE

my full code:
from typing import Literal

from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, MessagesState, START
from pydantic import BaseModel
from IPython.display import Image, display

class SupervisorDecision(BaseModel):
next: Literal["researcher", "coder", "FINISH"]

model = ChatOpenAI(model="openai/gpt-4o-mini",temperature=0,openai_api_base="https://openrouter.ai/api/v1")
model = model.with_structured_output(SupervisorDecision)

agents = ["researcher", "coder"]

system_prompt_part_1 = f"""You are a supervisor tasked with managing a conversation between the
following workers: {agents}. Given the following user request,
respond with the worker to act next. Each worker will perform a
task and respond with their results and status. When finished,
respond with FINISH."""

system_prompt_part_2 = f"""Given the conversation above, who should act next? Or should we FINISH? Select one of: {", ".join(agents)}, FINISH"""

def supervisor(state):
messages = [
("system", system_prompt_part_1),
*state["messages"],
("system", system_prompt_part_2),
]
# Get the supervisor's decision
decision = model.invoke(messages)
# Return a SystemMessage containing the decision
return {"messages": [SystemMessage(content=decision.next)] , "next": decision.next}

class AgentState(MessagesState):
next: Literal["researcher", "coder", "FINISH"]

def researcher(state: AgentState):
# In a real implementation, this would do research tasks
response = model.invoke(
[
{
"role": "system",
"content": "You are a research assistant. Analyze the request and provide relevant information.",
},
{"role": "user", "content": state["messages"][0].content},
]
)
return {"messages": [response]}

def coder(state: AgentState):
# In a real implementation, this would write code
response = model.invoke(
[
{
"role": "system",
"content": "You are a coding assistant. Implement the requested functionality.",
},
{"role": "user", "content": state["messages"][0].content},
]
)
return {"messages": [response]}

builder = StateGraph(AgentState)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("coder", coder)

builder.add_edge(START, "supervisor")

builder.add_conditional_edges("supervisor", lambda state: state["next"])
builder.add_edge("researcher", "supervisor")
builder.add_edge("coder", "supervisor")

graph = builder.compile()
image_bytes = graph.get_graph().draw_mermaid_png()
display(Image(data=image_bytes))

initial_state = {
"messages": [
{
"role": "user",
"content": "I need help analyzing some data and creating a visualization.",
}
],
"next": "supervisor",
}

for output in graph.stream(initial_state):
print(f"\nStep decision: {output.get('next', 'N/A')}")
if output.get("messages"):
print(f"Response: {output['messages'][-1].content[:100]}...")

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions