Skip to content

Implement FastMCP 2.0 upgrade and httpx monkey patch #60

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 101 additions & 174 deletions agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'auth_server'))
from cognito_utils import generate_token

# Import the httpx patch context manager
from httpx_patch import httpx_mount_path_patch

# Configure logging with basicConfig
logging.basicConfig(
level=logging.INFO, # Set the log level to INFO
Expand Down Expand Up @@ -352,7 +355,9 @@ async def invoke_mcp_tool(mcp_registry_url: str, server_name: str, tool_name: st

# Create the server URL by joining the base URL with the server name and sse path
server_url = urljoin(base_url + '/', f"{server_name}/sse")
print(f"Server URL: {server_url}")
logger.info(f"invoke_mcp_tool, Server URL: {server_url}")

# Use context manager to apply httpx monkey patch

# Prepare headers based on authentication method
headers = {
Expand All @@ -379,23 +384,24 @@ async def invoke_mcp_tool(mcp_registry_url: str, server_name: str, tool_name: st
}

try:
# Create an MCP SSE client and call the tool with authentication headers
#print(f"Connecting to MCP server: {server_url}, headers: {redacted_headers}")
logger.info(f"Connecting to MCP server: {server_url}, headers: {redacted_headers}")
async with mcp.client.sse.sse_client(server_url, headers=headers) as (read, write):
async with mcp.ClientSession(read, write, sampling_callback=None) as session:
# Initialize the connection
await session.initialize()

# Call the specified tool with the provided arguments
result = await session.call_tool(tool_name, arguments=arguments)

# Format the result as a string
response = ""
for r in result.content:
response += r.text + "\n"

return response.strip()
# Use context manager to apply httpx monkey patch and create MCP client
async with httpx_mount_path_patch(server_url):
# Create an MCP SSE client and call the tool with authentication headers
logger.info(f"invoke_mcp_tool, Connecting to MCP server: {server_url}, headers: {redacted_headers}")
async with mcp.client.sse.sse_client(server_url, headers=headers) as (read, write):
async with mcp.ClientSession(read, write, sampling_callback=None) as session:
# Initialize the connection
await session.initialize()

# Call the specified tool with the provided arguments
result = await session.call_tool(tool_name, arguments=arguments)

# Format the result as a string
response = ""
for r in result.content:
response += r.text + "\n"

return response.strip()
except Exception as e:
return f"Error invoking MCP tool: {str(e)}"

Expand All @@ -408,59 +414,6 @@ def redact_sensitive_value(value: str, show_chars: int = 4) -> str:
return "*" * len(value) if value else ""
return value[:show_chars] + "*" * (len(value) - show_chars)

def normalize_sse_endpoint_url_for_request(url_str: str, original_sse_url: str) -> str:
"""
Normalize URLs in HTTP requests by preserving mount paths for non-mounted servers.

This function only applies fixes when the request is for the same server as the original SSE URL.
It should NOT modify requests to different servers (like currenttime, fininfo, etc.)

Example:
- Original SSE: http://localhost/mcpgw2/sse
- Request to same server: http://localhost/messages/?session_id=123 -> http://localhost/mcpgw2/messages/?session_id=123
- Request to different server: http://localhost/currenttime/messages/?session_id=123 -> unchanged (already correct)
"""
if '/messages/' not in url_str:
return url_str

# Parse the original SSE URL to extract the base path
from urllib.parse import urlparse
parsed_original = urlparse(original_sse_url)
parsed_current = urlparse(url_str)

# Only apply fixes if this is the same host/port as the original SSE URL
if parsed_current.netloc != parsed_original.netloc:
return url_str

original_path = parsed_original.path

# Remove /sse from the original path to get the base mount path
if original_path.endswith('/sse'):
base_mount_path = original_path[:-4] # Remove '/sse'
else:
base_mount_path = original_path

# Only apply the fix if:
# 1. There is a base mount path (non-empty)
# 2. The current path is exactly /messages/... (indicating it's missing the mount path)
# 3. The current path doesn't already contain a mount path
if (base_mount_path and
parsed_current.path.startswith('/messages/') and
not parsed_current.path.startswith(base_mount_path)):

# The mount path is missing, we need to add it back
# Reconstruct the URL with the mount path
new_path = base_mount_path + parsed_current.path
fixed_url = f"{parsed_current.scheme}://{parsed_current.netloc}{new_path}"
if parsed_current.query:
fixed_url += f"?{parsed_current.query}"
if parsed_current.fragment:
fixed_url += f"#{parsed_current.fragment}"

logger.debug(f"Fixed mount path in request URL: {url_str} -> {fixed_url}")
return fixed_url

return url_str

def load_system_prompt():
"""
Expand Down Expand Up @@ -656,117 +609,91 @@ async def main():
redacted_headers[k] = v
logger.info(f"Using authentication headers: {redacted_headers}")

# Apply monkey patch to fix mount path issues in httpx requests
# This fixes the issue where non-mounted servers with default paths lose their mount path
# in POST requests to /messages/ endpoints
original_request = httpx.AsyncClient.request

async def patched_request(self, method, url, **kwargs):
# Fix mount path issues in requests
if isinstance(url, str) and '/messages/' in url:
url = normalize_sse_endpoint_url_for_request(url, server_url)
elif hasattr(url, '__str__') and '/messages/' in str(url):
url = normalize_sse_endpoint_url_for_request(str(url), server_url)
return await original_request(self, method, url, **kwargs)

# Apply the patch
httpx.AsyncClient.request = patched_request
logger.info("Applied httpx monkey patch to fix mount path issues")

try:
# Initialize MCP client with the server configuration and authentication headers
client = MultiServerMCPClient(
{
"mcp_registry": {
"url": server_url,
"transport": "sse",
"headers": auth_headers
# Use context manager to apply httpx monkey patch
async with httpx_mount_path_patch(server_url):
# Initialize MCP client with the server configuration and authentication headers
client = MultiServerMCPClient(
{
"mcp_registry": {
"url": server_url,
"transport": "sse",
"headers": auth_headers
}
}
}
)
logger.info("Connected to MCP server successfully with authentication, server_url: " + server_url)

# Get available tools from MCP and display them
mcp_tools = await client.get_tools()
logger.info(f"Available MCP tools: {[tool.name for tool in mcp_tools]}")

# Add the calculator and invoke_mcp_tool to the tools array
# The invoke_mcp_tool function already supports authentication parameters
all_tools = [calculator, invoke_mcp_tool] + mcp_tools
logger.info(f"All available tools: {[tool.name if hasattr(tool, 'name') else tool.__name__ for tool in all_tools]}")

# Create the agent with the model and all tools
agent = create_react_agent(
model,
all_tools
)

# Load and format the system prompt with the current time and MCP registry URL
system_prompt_template = load_system_prompt()

# Prepare authentication parameters for system prompt
if args.use_session_cookie:
system_prompt = system_prompt_template.format(
current_utc_time=current_utc_time,
mcp_registry_url=args.mcp_registry_url,
auth_token='', # Not used for session cookie auth
user_pool_id=args.user_pool_id or '',
client_id=args.client_id or '',
region=args.region or 'us-east-1',
auth_method=auth_method,
session_cookie=session_cookie
)
else:
system_prompt = system_prompt_template.format(
current_utc_time=current_utc_time,
mcp_registry_url=args.mcp_registry_url,
auth_token=access_token,
user_pool_id=args.user_pool_id,
client_id=args.client_id,
region=args.region,
auth_method=auth_method,
session_cookie='' # Not used for M2M auth
logger.info("Connected to MCP server successfully with authentication, server_url: " + server_url)

# Get available tools from MCP and display them
mcp_tools = await client.get_tools()
logger.info(f"Available MCP tools: {[tool.name for tool in mcp_tools]}")

# Add the calculator and invoke_mcp_tool to the tools array
# The invoke_mcp_tool function already supports authentication parameters
all_tools = [calculator, invoke_mcp_tool] + mcp_tools
logger.info(f"All available tools: {[tool.name if hasattr(tool, 'name') else tool.__name__ for tool in all_tools]}")

# Create the agent with the model and all tools
agent = create_react_agent(
model,
all_tools
)

# Format the message with system message first
formatted_messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": args.message}
]

logger.info("\nInvoking agent...\n" + "-"*40)

# Invoke the agent with the formatted messages
response = await agent.ainvoke({"messages": formatted_messages})

logger.info("\nResponse:" + "\n" + "-"*40)
#print(response)
print_agent_response(response)

# Process and display the response
if response and "messages" in response and response["messages"]:
# Get the last message from the response
last_message = response["messages"][-1]

if isinstance(last_message, dict) and "content" in last_message:
# Display the content of the response
print(last_message["content"])
# Load and format the system prompt with the current time and MCP registry URL
system_prompt_template = load_system_prompt()

# Prepare authentication parameters for system prompt
if args.use_session_cookie:
system_prompt = system_prompt_template.format(
current_utc_time=current_utc_time,
mcp_registry_url=args.mcp_registry_url,
auth_token='', # Not used for session cookie auth
user_pool_id=args.user_pool_id or '',
client_id=args.client_id or '',
region=args.region or 'us-east-1',
auth_method=auth_method,
session_cookie=session_cookie
)
else:
print(str(last_message.content))
else:
print("No valid response received")
system_prompt = system_prompt_template.format(
current_utc_time=current_utc_time,
mcp_registry_url=args.mcp_registry_url,
auth_token=access_token,
user_pool_id=args.user_pool_id,
client_id=args.client_id,
region=args.region,
auth_method=auth_method,
session_cookie='' # Not used for M2M auth
)

# Format the message with system message first
formatted_messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": args.message}
]

logger.info("\nInvoking agent...\n" + "-"*40)

finally:
# Restore original httpx behavior
httpx.AsyncClient.request = original_request
logger.info("Restored original httpx behavior")
# Invoke the agent with the formatted messages
response = await agent.ainvoke({"messages": formatted_messages})

logger.info("\nResponse:" + "\n" + "-"*40)
#print(response)
print_agent_response(response)

# Process and display the response
if response and "messages" in response and response["messages"]:
# Get the last message from the response
last_message = response["messages"][-1]

if isinstance(last_message, dict) and "content" in last_message:
# Display the content of the response
print(last_message["content"])
else:
print(str(last_message.content))
else:
print("No valid response received")

except Exception as e:
# Restore original httpx behavior in case of error
try:
httpx.AsyncClient.request = original_request
except NameError:
pass # original_request might not be defined if error occurred before monkey patch
print(f"Error: {str(e)}")
import traceback
print(traceback.format_exc())
Expand Down
Loading
Loading