Skip to content

Ch.08 missing code #11

@JamesCHub

Description

@JamesCHub

Code in chapter 8 gets a bit sketchy, and the missing bits are not necessarily obvious.

Here's a rough pass at the Human Interruption code. This includes a few other useful techniques and functions, like using Ollama via OpenAI

### Imports

import ast, operator
from typing import Annotated, TypedDict
from uuid import uuid4
import sys

import asyncio
from contextlib import aclosing

from langchain_openai import ChatOpenAI
from langchain_core.messages import AIMessage, HumanMessage, ToolCall
from langchain_core.tools import tool
from langchain_community.tools import DuckDuckGoSearchRun

# LangGraph

from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.memory import MemorySaver


def arithmetic_eval(s):
    """This function is a safe arithmetic expression evaluator 
    that parses and evaluates mathematical expressions from strings 
    without using Python's potentially dangerous eval() function."""
    binOps = {
                ast.Add: operator.add,
                ast.Sub: operator.sub,
                ast.Mult: operator.mul,
                ast.Div: operator.truediv,
                ast.Mod: operator.mod
                }
    unOps = {
                ast.USub: operator.neg
                }
    node = ast.parse(s, mode='eval')
    def _eval(node):
        if isinstance(node, ast.Expression):
            return _eval(node.body)
        elif isinstance(node, ast.Str):
            return node.s
        elif isinstance(node, ast.Num):
            return node.n
        elif isinstance(node, ast.BinOp):
            return binOps[type(node.op)](_eval(node.left), _eval(node.right))
        elif isinstance(node, ast.UnaryOp):
            return unOps[type(node.op)](_eval(node.operand))
        else:
            raise Exception('Unsupported type {}'.format(node))

    return _eval(node.body)

@tool
def calculator(query: str) -> str:
    """evaluate a mathematical expression"""
    try:
        result = ast.literal_eval(query)
    except:
        # default eval could not handle a number of simple expressions
        result = arithmetic_eval(query)
    return result

# Setup
search = DuckDuckGoSearchRun()  # FYI: internal library is deprecated
ourTools = [search, calculator]

# Using Ollama, via the OpenAI proxy - it's freeeeeee!
useOpenAIProxy = True
if useOpenAIProxy:
    ourModel = ChatOpenAI(
        model="qwen2.5:7b", 
        base_url="http://localhost:11434/v1", 
        api_key="ollama",
        temperature=0
    ).bind_tools(ourTools)

class State(TypedDict):
    messages: Annotated[list, add_messages]

async def model_node(state: State) -> State:
    """Async version of a node

    Args:
        state (State): State

    Returns:
        State: State
    """
    # need to use await and ainvoke here for async
    res = await ourModel.ainvoke(state["messages"])
    return {"messages": res}

# Let's build this mother - async style
async def main():
    # Funcs and Classes

    # Setup
    
    builder = StateGraph(State)
    builder.add_node("model", model_node)
    builder.add_node("tools", ToolNode(ourTools))

    builder.add_edge(START, "model")
    builder.add_conditional_edges("model", tools_condition)
    builder.add_edge("tools", "model")

    ourGraph = builder.compile(checkpointer=MemorySaver())

    # Intiialize the async primitive
    event = asyncio.Event()
    # Normal input
    ourInput = {"messages":[HumanMessage("""How old was the 30th President of the United States when he died?""")]}
    config = {"configurable": {"thread_id": "1"}}
    print("Okay, read to run this graph!")
    print(ourGraph)
    # use aclosing to close the stream properly when interrupted
    # astream is just async version of stream
    async with aclosing(ourGraph.astream(ourInput, config)) as stream:
        async for chunk in stream:
            if event.is_set():
                # if flag is set, break
                break
            else:
                print(chunk)
    # simulate an interruption after 2 seconds
    await asyncio.sleep(2)
    event.set()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Whoah, was not expecting to be interrupted!")
        sys.exit(1)
    except asyncio.CancelledError:
        print("Main: Received cancellation, shutting down gracefully...hahaha")
        sys.exit(1)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions