How to use agent executor astream_events
?
#24851
Replies: 2 comments 1 reply
-
To use the async for event in agent_executor.astream_events(
{"messages": [HumanMessage(content="describe this image: [image_url]")]}, version="v1"
):
kind = event["event"]
if kind == "on_chain_start":
if event["name"] == "Agent":
print(f"Starting agent: {event['name']} with input: {event['data'].get('input')}")
elif kind == "on_chain_end":
if event["name"] == "Agent":
print()
print("--")
print(f"Done agent: {event['name']} with output: {event['data'].get('output')['output']}")
if kind == "on_chain_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="|")
elif kind == "on_tool_start":
print("--")
print(f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}")
elif kind == "on_tool_end":
print(f"Done tool: {event['name']}")
print(f"Tool output was: {event['data'].get('output')}")
print("--") This code ensures that the The For example, in the provided document, the class StreamingRunnable(Runnable[Input, Output]):
"""A custom runnable used for testing purposes"""
iterable: Iterable[Any]
def __init__(self, iterable: Iterable[Any]) -> None:
"""Initialize the runnable."""
self.iterable = iterable
async def astream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[Output]:
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name", self.get_name()),
run_id=config.get("run_id"),
)
try:
final_output = None
for element in self.iterable:
if isinstance(element, BaseException):
raise element
yield element
if final_output is None:
final_output = element
else:
try:
final_output = final_output + element
except TypeError:
final_output = element
# set final channel values as run output
run_manager.on_chain_end(final_output)
except BaseException as e:
run_manager.on_chain_error(e)
raise This example shows how to handle streaming output and manage the execution flow using events, ensuring that the next tool waits until the current tool has finished its execution [1][2][3][4]. |
Beta Was this translation helpful? Give feedback.
-
To use the async for event in agent_executor.astream_events(
{"messages": [HumanMessage(content="describe this image: [image_url]")]}, version="v1"
):
kind = event["event"]
if kind == "on_chain_start":
if event["name"] == "Agent":
print(f"Starting agent: {event['name']} with input: {event['data'].get('input')}")
elif kind == "on_chain_end":
if event["name"] == "Agent":
print()
print("--")
print(f"Done agent: {event['name']} with output: {event['data'].get('output')['output']}")
if kind == "on_chain_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="|")
elif kind == "on_tool_start":
print("--")
print(f"Starting tool: {event['name']} with inputs: {event['data'].get('input')}")
elif kind == "on_tool_end":
print(f"Done tool: {event['name']}")
print(f"Tool output was: {event['data'].get('output')}")
print("--") This code ensures that the The For example, in the provided document, the class StreamingRunnable(Runnable[Input, Output]):
"""A custom runnable used for testing purposes"""
iterable: Iterable[Any]
def __init__(self, iterable: Iterable[Any]) -> None:
"""Initialize the runnable."""
self.iterable = iterable
async def astream(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> AsyncIterator[Output]:
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
dumpd(self),
input,
name=config.get("run_name", self.get_name()),
run_id=config.get("run_id"),
)
try:
final_output = None
for element in self.iterable:
if isinstance(element, BaseException):
raise element
yield element
if final_output is None:
final_output = element
else:
try:
final_output = final_output + element
except TypeError:
final_output = element
# set final channel values as run output
run_manager.on_chain_end(final_output)
except BaseException as e:
run_manager.on_chain_error(e)
raise This example shows how to handle streaming output and manage the execution flow using events, ensuring that the next tool waits until the current tool has finished its execution [1][2][3][4]. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Checked other resources
Commit to Help
Example Code
Description
When I use
astream_event
, the tool I've defined calls the OpenAI API to describe an image.Therefore, I want the description process of this tool to output in a streaming manner.
However, I encountered the following two issues:
Due to confidentiality reasons, only pseudocode can be displayed.
System Info
System Information
Package Information
Packages not installed (Not Necessarily a Problem)
The following packages were not found:
Beta Was this translation helpful? Give feedback.
All reactions