-
Notifications
You must be signed in to change notification settings - Fork 88
Open
Description
Code in chapter 8 gets a bit sketchy, and the missing bits are not necessarily obvious.
Here's a rough pass at the Human Interruption code. This includes a few other useful techniques and functions, like using Ollama via OpenAI
### Imports
import ast, operator
from typing import Annotated, TypedDict
from uuid import uuid4
import sys
import asyncio
from contextlib import aclosing
from langchain_openai import ChatOpenAI
from langchain_core.messages import AIMessage, HumanMessage, ToolCall
from langchain_core.tools import tool
from langchain_community.tools import DuckDuckGoSearchRun
# LangGraph
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import MemorySaver
def arithmetic_eval(s):
"""This function is a safe arithmetic expression evaluator
that parses and evaluates mathematical expressions from strings
without using Python's potentially dangerous eval() function."""
binOps = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Mod: operator.mod
}
unOps = {
ast.USub: operator.neg
}
node = ast.parse(s, mode='eval')
def _eval(node):
if isinstance(node, ast.Expression):
return _eval(node.body)
elif isinstance(node, ast.Str):
return node.s
elif isinstance(node, ast.Num):
return node.n
elif isinstance(node, ast.BinOp):
return binOps[type(node.op)](_eval(node.left), _eval(node.right))
elif isinstance(node, ast.UnaryOp):
return unOps[type(node.op)](_eval(node.operand))
else:
raise Exception('Unsupported type {}'.format(node))
return _eval(node.body)
@tool
def calculator(query: str) -> str:
"""evaluate a mathematical expression"""
try:
result = ast.literal_eval(query)
except:
# default eval could not handle a number of simple expressions
result = arithmetic_eval(query)
return result
# Setup
search = DuckDuckGoSearchRun() # FYI: internal library is deprecated
ourTools = [search, calculator]
# Using Ollama, via the OpenAI proxy - it's freeeeeee!
useOpenAIProxy = True
if useOpenAIProxy:
ourModel = ChatOpenAI(
model="qwen2.5:7b",
base_url="http://localhost:11434/v1",
api_key="ollama",
temperature=0
).bind_tools(ourTools)
class State(TypedDict):
messages: Annotated[list, add_messages]
async def model_node(state: State) -> State:
"""Async version of a node
Args:
state (State): State
Returns:
State: State
"""
# need to use await and ainvoke here for async
res = await ourModel.ainvoke(state["messages"])
return {"messages": res}
# Let's build this mother - async style
async def main():
# Funcs and Classes
# Setup
builder = StateGraph(State)
builder.add_node("model", model_node)
builder.add_node("tools", ToolNode(ourTools))
builder.add_edge(START, "model")
builder.add_conditional_edges("model", tools_condition)
builder.add_edge("tools", "model")
ourGraph = builder.compile(checkpointer=MemorySaver())
# Intiialize the async primitive
event = asyncio.Event()
# Normal input
ourInput = {"messages":[HumanMessage("""How old was the 30th President of the United States when he died?""")]}
config = {"configurable": {"thread_id": "1"}}
print("Okay, read to run this graph!")
print(ourGraph)
# use aclosing to close the stream properly when interrupted
# astream is just async version of stream
async with aclosing(ourGraph.astream(ourInput, config)) as stream:
async for chunk in stream:
if event.is_set():
# if flag is set, break
break
else:
print(chunk)
# simulate an interruption after 2 seconds
await asyncio.sleep(2)
event.set()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Whoah, was not expecting to be interrupted!")
sys.exit(1)
except asyncio.CancelledError:
print("Main: Received cancellation, shutting down gracefully...hahaha")
sys.exit(1)
Metadata
Metadata
Assignees
Labels
No labels