Skip to content

Commit ff8ab23

Browse files
authored
[AI Agents] add mcp streaming sample (#42046)
* [AI Agents] add mcp streaming sample * remove outdated line * async streaming example for mcp * copilot feedback
1 parent 5265599 commit ff8ab23

File tree

2 files changed

+350
-0
lines changed

2 files changed

+350
-0
lines changed
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
# pylint: disable=line-too-long,useless-suppression
2+
# ------------------------------------
3+
# Copyright (c) Microsoft Corporation.
4+
# Licensed under the MIT License.
5+
# ------------------------------------
6+
7+
"""
8+
DESCRIPTION:
9+
This sample demonstrates how to use agent operations with an event handler
10+
and Model Context Protocol (MCP) Tool from the Azure Agents service using
11+
an asynchronous client.
12+
13+
To learn more about Model Context Protocol, visit https://modelcontextprotocol.io/
14+
15+
USAGE:
16+
python sample_agents_stream_eventhandler_with_mcp_async.py
17+
18+
Before running the sample:
19+
20+
pip install azure-ai-projects azure-ai-agents azure-identity aiohttp
21+
22+
Set these environment variables with your own values:
23+
1) PROJECT_ENDPOINT - The Azure AI Project endpoint, as found in the Overview
24+
page of your Azure AI Foundry portal.
25+
2) MODEL_DEPLOYMENT_NAME - The deployment name of the AI model, as found under the "Name" column in
26+
the "Models + endpoints" tab in your Azure AI Foundry project.
27+
3) MCP_SERVER_URL - The URL of your MCP server endpoint.
28+
4) MCP_SERVER_LABEL - A label for your MCP server.
29+
"""
30+
import asyncio
31+
from typing import Any
32+
33+
import os
34+
from azure.ai.projects.aio import AIProjectClient
35+
from azure.ai.agents.aio import AgentsClient
36+
from azure.ai.agents.models import (
37+
AsyncAgentEventHandler,
38+
ListSortOrder,
39+
MessageTextContent,
40+
MessageDeltaChunk,
41+
McpTool,
42+
RequiredMcpToolCall,
43+
RunStep,
44+
SubmitToolApprovalAction,
45+
ThreadMessage,
46+
ThreadRun,
47+
ToolApproval,
48+
)
49+
from azure.identity.aio import DefaultAzureCredential
50+
from utils.user_async_functions import user_async_functions
51+
52+
# Get MCP server configuration from environment variables
53+
mcp_server_url = os.environ.get("MCP_SERVER_URL", "https://gitmcp.io/Azure/azure-rest-api-specs")
54+
mcp_server_label = os.environ.get("MCP_SERVER_LABEL", "github")
55+
56+
# Initialize agent MCP tool
57+
mcp_tool = McpTool(
58+
server_label=mcp_server_label,
59+
server_url=mcp_server_url,
60+
allowed_tools=[], # Optional: specify allowed tools
61+
)
62+
63+
class MyEventHandler(AsyncAgentEventHandler[str]):
64+
65+
def __init__(self, agents_client: AgentsClient) -> None:
66+
super().__init__()
67+
self.agents_client = agents_client
68+
69+
async def on_message_delta(self, delta: "MessageDeltaChunk") -> None:
70+
print(f"Text delta received: {delta.text}")
71+
72+
async def on_thread_message(self, message: "ThreadMessage") -> None:
73+
print(f"ThreadMessage created. ID: {message.id}, Status: {message.status}")
74+
75+
async def on_thread_run(self, run: "ThreadRun") -> None:
76+
print(f"ThreadRun status: {run.status}")
77+
78+
if run.status == "failed":
79+
print(f"Run failed. Error: {run.last_error}")
80+
81+
if run.status == "requires_action" and isinstance(run.required_action, SubmitToolApprovalAction):
82+
tool_calls = run.required_action.submit_tool_approval.tool_calls
83+
84+
tool_approvals = []
85+
for tool_call in tool_calls:
86+
if isinstance(tool_call, RequiredMcpToolCall):
87+
try:
88+
print(f"Approving tool call: {tool_call}")
89+
tool_approvals.append(
90+
ToolApproval(
91+
tool_call_id=tool_call.id,
92+
approve=True,
93+
headers=mcp_tool.headers,
94+
)
95+
)
96+
except Exception as e:
97+
print(f"Error approving tool_call {tool_call.id}: {e}")
98+
99+
print(f"tool_approvals: {tool_approvals}")
100+
if tool_approvals:
101+
# Once we receive 'requires_action' status, the next event will be DONE.
102+
# Here we associate our existing event handler to the next stream.
103+
await self.agents_client.runs.submit_tool_outputs_stream(
104+
thread_id=run.thread_id, run_id=run.id, tool_approvals=tool_approvals, event_handler=self
105+
)
106+
107+
async def on_run_step(self, step: "RunStep") -> None:
108+
print(f"RunStep type: {step.type}, Status: {step.status}")
109+
110+
async def on_error(self, data: str) -> None:
111+
print(f"An error occurred. Data: {data}")
112+
113+
async def on_done(self) -> None:
114+
print("Stream completed.")
115+
116+
async def on_unhandled_event(self, event_type: str, event_data: Any) -> None:
117+
print(f"Unhandled Event Type: {event_type}, Data: {event_data}")
118+
119+
120+
async def main() -> None:
121+
project_client = AIProjectClient(
122+
endpoint=os.environ["PROJECT_ENDPOINT"],
123+
credential=DefaultAzureCredential(),
124+
)
125+
126+
async with project_client:
127+
agents_client = project_client.agents
128+
129+
# add allowed tools dynamically
130+
search_api_code = "search_azure_rest_api_code"
131+
mcp_tool.allow_tool(search_api_code)
132+
print(f"Allowed tools: {mcp_tool.allowed_tools}")
133+
134+
agent = await agents_client.create_agent(
135+
model=os.environ["MODEL_DEPLOYMENT_NAME"],
136+
name="my-agent",
137+
instructions="You are a helpful agent that can use MCP tools to assist users. Use the available MCP tools to answer questions and perform tasks.",
138+
tools=mcp_tool.definitions,
139+
)
140+
print(f"Created agent, ID: {agent.id}")
141+
142+
thread = await agents_client.threads.create()
143+
print(f"Created thread, thread ID {thread.id}")
144+
145+
message = await agents_client.messages.create(
146+
thread_id=thread.id,
147+
role="user",
148+
content="Please summarize the Azure REST API specifications Readme"
149+
)
150+
print(f"Created message, message ID {message.id}")
151+
152+
mcp_tool.update_headers("SuperSecret", "123456")
153+
# mcp_tool.set_approval_mode("never") # Uncomment to disable approval requirement
154+
155+
async with await agents_client.runs.stream(
156+
thread_id=thread.id,
157+
agent_id=agent.id,
158+
tool_resources=mcp_tool.resources,
159+
event_handler=MyEventHandler(agents_client),
160+
) as stream:
161+
await stream.until_done()
162+
163+
await agents_client.delete_agent(agent.id)
164+
print("Deleted agent")
165+
166+
messages = agents_client.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING)
167+
async for msg in messages:
168+
last_part = msg.content[-1]
169+
if isinstance(last_part, MessageTextContent):
170+
print(f"{msg.role}: {last_part.text.value}")
171+
172+
173+
if __name__ == "__main__":
174+
asyncio.run(main())
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# pylint: disable=line-too-long,useless-suppression
2+
# ------------------------------------
3+
# Copyright (c) Microsoft Corporation.
4+
# Licensed under the MIT License.
5+
# ------------------------------------
6+
7+
"""
8+
DESCRIPTION:
9+
This sample demonstrates how to use agent operations with the
10+
Model Context Protocol (MCP) tool from the Azure Agents service, and
11+
iteration in streaming. It uses a synchronous client.
12+
To learn more about Model Context Protocol, visit https://modelcontextprotocol.io/
13+
14+
USAGE:
15+
python sample_agents_stream_iteration_with_mcp.py
16+
17+
Before running the sample:
18+
19+
pip install azure-ai-projects azure-ai-agents azure-identity
20+
21+
Set these environment variables with your own values:
22+
1) PROJECT_ENDPOINT - The Azure AI Project endpoint, as found in the Overview
23+
page of your Azure AI Foundry portal.
24+
2) MODEL_DEPLOYMENT_NAME - The deployment name of the AI model, as found under the "Name" column in
25+
the "Models + endpoints" tab in your Azure AI Foundry project.
26+
3) MCP_SERVER_URL - The URL of your MCP server endpoint.
27+
4) MCP_SERVER_LABEL - A label for your MCP server.
28+
"""
29+
30+
import os
31+
from azure.ai.projects import AIProjectClient
32+
from azure.ai.agents.models import AgentStreamEvent, RunStepDeltaChunk
33+
from azure.ai.agents.models import (
34+
MessageDeltaChunk,
35+
RunStep,
36+
ThreadMessage,
37+
ThreadRun,
38+
McpTool,
39+
MessageRole,
40+
MessageDeltaTextContent,
41+
MessageDeltaTextUrlCitationAnnotation,
42+
RequiredMcpToolCall,
43+
SubmitToolApprovalAction,
44+
ToolApproval
45+
)
46+
from azure.identity import DefaultAzureCredential
47+
48+
project_client = AIProjectClient(
49+
endpoint=os.environ["PROJECT_ENDPOINT"],
50+
credential=DefaultAzureCredential(),
51+
)
52+
53+
with project_client:
54+
agents_client = project_client.agents
55+
56+
# Get MCP server configuration from environment variables
57+
mcp_server_url = os.environ.get("MCP_SERVER_URL", "https://gitmcp.io/Azure/azure-rest-api-specs")
58+
mcp_server_label = os.environ.get("MCP_SERVER_LABEL", "github")
59+
60+
# Initialize agent MCP tool
61+
mcp_tool = McpTool(
62+
server_label=mcp_server_label,
63+
server_url=mcp_server_url,
64+
allowed_tools=[], # Optional: specify allowed tools
65+
)
66+
67+
# You can also add or remove allowed tools dynamically
68+
search_api_code = "search_azure_rest_api_code"
69+
mcp_tool.allow_tool(search_api_code)
70+
print(f"Allowed tools: {mcp_tool.allowed_tools}")
71+
72+
# Create a new agent.
73+
# NOTE: To reuse existing agent, fetch it with get_agent(agent.id)
74+
agent = agents_client.create_agent(
75+
model=os.environ["MODEL_DEPLOYMENT_NAME"],
76+
name="my-agent",
77+
instructions="You are a helpful agent that can use MCP tools to assist users. Use the available MCP tools to answer questions and perform tasks.",
78+
tools=mcp_tool.definitions,
79+
)
80+
print(f"Created agent, agent ID: {agent.id}")
81+
82+
thread = agents_client.threads.create()
83+
print(f"Created thread, thread ID {thread.id}")
84+
85+
message = agents_client.messages.create(
86+
thread_id=thread.id,
87+
role=MessageRole.USER,
88+
content="Please summarize the Azure REST API specifications Readme"
89+
)
90+
print(f"Created message, message ID {message.id}")
91+
92+
# Process Agent run and stream events back to the client. It may take a few minutes for the agent to complete the run.
93+
mcp_tool.update_headers("SuperSecret", "123456")
94+
# mcp_tool.set_approval_mode("never") # Uncomment to disable approval requirement
95+
with agents_client.runs.stream(thread_id=thread.id, agent_id=agent.id, tool_resources=mcp_tool.resources) as stream:
96+
97+
for event_type, event_data, _ in stream:
98+
99+
if isinstance(event_data, MessageDeltaChunk):
100+
print(f"Text delta received: {event_data.text}")
101+
if event_data.delta.content and isinstance(event_data.delta.content[0], MessageDeltaTextContent):
102+
delta_text_content = event_data.delta.content[0]
103+
if delta_text_content.text and delta_text_content.text.annotations:
104+
for delta_annotation in delta_text_content.text.annotations:
105+
if isinstance(delta_annotation, MessageDeltaTextUrlCitationAnnotation):
106+
print(
107+
f"URL citation delta received: [{delta_annotation.url_citation.title}]({delta_annotation.url_citation.url})"
108+
)
109+
110+
elif isinstance(event_data, RunStepDeltaChunk):
111+
print(f"RunStepDeltaChunk received. ID: {event_data.id}.")
112+
113+
elif isinstance(event_data, ThreadMessage):
114+
print(f"ThreadMessage created. ID: {event_data.id}, Status: {event_data.status}")
115+
116+
elif isinstance(event_data, ThreadRun):
117+
print(f"ThreadRun status: {event_data.status}")
118+
119+
if event_data.status == "failed":
120+
print(f"Run failed. Error: {event_data.last_error}")
121+
122+
if event_data.status == "requires_action" and isinstance(
123+
event_data.required_action, SubmitToolApprovalAction
124+
):
125+
tool_calls = event_data.required_action.submit_tool_approval.tool_calls
126+
if not tool_calls:
127+
print("No tool calls provided - cancelling run")
128+
agents_client.runs.cancel(thread_id=event_data.thread_id, run_id=event_data.id)
129+
break
130+
131+
tool_approvals = []
132+
for tool_call in tool_calls:
133+
if isinstance(tool_call, RequiredMcpToolCall):
134+
try:
135+
print(f"Approving tool call: {tool_call}")
136+
tool_approvals.append(
137+
ToolApproval(
138+
tool_call_id=tool_call.id,
139+
approve=True,
140+
headers=mcp_tool.headers,
141+
)
142+
)
143+
except Exception as e:
144+
print(f"Error approving tool_call {tool_call.id}: {e}")
145+
146+
print(f"tool_approvals: {tool_approvals}")
147+
if tool_approvals:
148+
# Once we receive 'requires_action' status, the next event will be DONE.
149+
# Here we associate our existing event handler to the next stream.
150+
agents_client.runs.submit_tool_outputs_stream(
151+
thread_id=event_data.thread_id, run_id=event_data.id, tool_approvals=tool_approvals, event_handler=stream
152+
)
153+
154+
elif isinstance(event_data, RunStep):
155+
print(f"RunStep type: {event_data.type}, Status: {event_data.status}")
156+
157+
elif event_type == AgentStreamEvent.ERROR:
158+
print(f"An error occurred. Data: {event_data}")
159+
160+
elif event_type == AgentStreamEvent.DONE:
161+
print("Stream completed.")
162+
163+
else:
164+
print(f"Unhandled Event Type: {event_type}, Data: {event_data}")
165+
166+
# Clean-up and delete the agent once the run is finished.
167+
# NOTE: Comment out this line if you plan to reuse the agent later.
168+
agents_client.delete_agent(agent.id)
169+
print("Deleted agent")
170+
171+
response_message = agents_client.messages.get_last_message_by_role(thread_id=thread.id, role=MessageRole.AGENT)
172+
if response_message:
173+
for text_message in response_message.text_messages:
174+
print(f"Agent response: {text_message.text.value}")
175+
for annotation in response_message.url_citation_annotations:
176+
print(f"URL Citation: [{annotation.url_citation.title}]({annotation.url_citation.url})")

0 commit comments

Comments
 (0)