Skip to content

Commit 7b22e3b

Browse files
authored
Merge pull request #60 from agentic-community/feature/fastmcp-2.0-upgrade-issue-59
Implement FastMCP 2.0 upgrade and httpx monkey patch
2 parents 99fbf16 + 54f77ab commit 7b22e3b

File tree

8 files changed

+544
-198
lines changed

8 files changed

+544
-198
lines changed

agents/agent.py

Lines changed: 101 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@
6969
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'auth_server'))
7070
from cognito_utils import generate_token
7171

72+
# Import the httpx patch context manager
73+
from httpx_patch import httpx_mount_path_patch
74+
7275
# Configure logging with basicConfig
7376
logging.basicConfig(
7477
level=logging.INFO, # Set the log level to INFO
@@ -352,7 +355,9 @@ async def invoke_mcp_tool(mcp_registry_url: str, server_name: str, tool_name: st
352355

353356
# Create the server URL by joining the base URL with the server name and sse path
354357
server_url = urljoin(base_url + '/', f"{server_name}/sse")
355-
print(f"Server URL: {server_url}")
358+
logger.info(f"invoke_mcp_tool, Server URL: {server_url}")
359+
360+
# Use context manager to apply httpx monkey patch
356361

357362
# Prepare headers based on authentication method
358363
headers = {
@@ -379,23 +384,24 @@ async def invoke_mcp_tool(mcp_registry_url: str, server_name: str, tool_name: st
379384
}
380385

381386
try:
382-
# Create an MCP SSE client and call the tool with authentication headers
383-
#print(f"Connecting to MCP server: {server_url}, headers: {redacted_headers}")
384-
logger.info(f"Connecting to MCP server: {server_url}, headers: {redacted_headers}")
385-
async with mcp.client.sse.sse_client(server_url, headers=headers) as (read, write):
386-
async with mcp.ClientSession(read, write, sampling_callback=None) as session:
387-
# Initialize the connection
388-
await session.initialize()
389-
390-
# Call the specified tool with the provided arguments
391-
result = await session.call_tool(tool_name, arguments=arguments)
392-
393-
# Format the result as a string
394-
response = ""
395-
for r in result.content:
396-
response += r.text + "\n"
397-
398-
return response.strip()
387+
# Use context manager to apply httpx monkey patch and create MCP client
388+
async with httpx_mount_path_patch(server_url):
389+
# Create an MCP SSE client and call the tool with authentication headers
390+
logger.info(f"invoke_mcp_tool, Connecting to MCP server: {server_url}, headers: {redacted_headers}")
391+
async with mcp.client.sse.sse_client(server_url, headers=headers) as (read, write):
392+
async with mcp.ClientSession(read, write, sampling_callback=None) as session:
393+
# Initialize the connection
394+
await session.initialize()
395+
396+
# Call the specified tool with the provided arguments
397+
result = await session.call_tool(tool_name, arguments=arguments)
398+
399+
# Format the result as a string
400+
response = ""
401+
for r in result.content:
402+
response += r.text + "\n"
403+
404+
return response.strip()
399405
except Exception as e:
400406
return f"Error invoking MCP tool: {str(e)}"
401407

@@ -408,59 +414,6 @@ def redact_sensitive_value(value: str, show_chars: int = 4) -> str:
408414
return "*" * len(value) if value else ""
409415
return value[:show_chars] + "*" * (len(value) - show_chars)
410416

411-
def normalize_sse_endpoint_url_for_request(url_str: str, original_sse_url: str) -> str:
412-
"""
413-
Normalize URLs in HTTP requests by preserving mount paths for non-mounted servers.
414-
415-
This function only applies fixes when the request is for the same server as the original SSE URL.
416-
It should NOT modify requests to different servers (like currenttime, fininfo, etc.)
417-
418-
Example:
419-
- Original SSE: http://localhost/mcpgw2/sse
420-
- Request to same server: http://localhost/messages/?session_id=123 -> http://localhost/mcpgw2/messages/?session_id=123
421-
- Request to different server: http://localhost/currenttime/messages/?session_id=123 -> unchanged (already correct)
422-
"""
423-
if '/messages/' not in url_str:
424-
return url_str
425-
426-
# Parse the original SSE URL to extract the base path
427-
from urllib.parse import urlparse
428-
parsed_original = urlparse(original_sse_url)
429-
parsed_current = urlparse(url_str)
430-
431-
# Only apply fixes if this is the same host/port as the original SSE URL
432-
if parsed_current.netloc != parsed_original.netloc:
433-
return url_str
434-
435-
original_path = parsed_original.path
436-
437-
# Remove /sse from the original path to get the base mount path
438-
if original_path.endswith('/sse'):
439-
base_mount_path = original_path[:-4] # Remove '/sse'
440-
else:
441-
base_mount_path = original_path
442-
443-
# Only apply the fix if:
444-
# 1. There is a base mount path (non-empty)
445-
# 2. The current path is exactly /messages/... (indicating it's missing the mount path)
446-
# 3. The current path doesn't already contain a mount path
447-
if (base_mount_path and
448-
parsed_current.path.startswith('/messages/') and
449-
not parsed_current.path.startswith(base_mount_path)):
450-
451-
# The mount path is missing, we need to add it back
452-
# Reconstruct the URL with the mount path
453-
new_path = base_mount_path + parsed_current.path
454-
fixed_url = f"{parsed_current.scheme}://{parsed_current.netloc}{new_path}"
455-
if parsed_current.query:
456-
fixed_url += f"?{parsed_current.query}"
457-
if parsed_current.fragment:
458-
fixed_url += f"#{parsed_current.fragment}"
459-
460-
logger.debug(f"Fixed mount path in request URL: {url_str} -> {fixed_url}")
461-
return fixed_url
462-
463-
return url_str
464417

465418
def load_system_prompt():
466419
"""
@@ -656,117 +609,91 @@ async def main():
656609
redacted_headers[k] = v
657610
logger.info(f"Using authentication headers: {redacted_headers}")
658611

659-
# Apply monkey patch to fix mount path issues in httpx requests
660-
# This fixes the issue where non-mounted servers with default paths lose their mount path
661-
# in POST requests to /messages/ endpoints
662-
original_request = httpx.AsyncClient.request
663-
664-
async def patched_request(self, method, url, **kwargs):
665-
# Fix mount path issues in requests
666-
if isinstance(url, str) and '/messages/' in url:
667-
url = normalize_sse_endpoint_url_for_request(url, server_url)
668-
elif hasattr(url, '__str__') and '/messages/' in str(url):
669-
url = normalize_sse_endpoint_url_for_request(str(url), server_url)
670-
return await original_request(self, method, url, **kwargs)
671-
672-
# Apply the patch
673-
httpx.AsyncClient.request = patched_request
674-
logger.info("Applied httpx monkey patch to fix mount path issues")
675-
676-
try:
677-
# Initialize MCP client with the server configuration and authentication headers
678-
client = MultiServerMCPClient(
679-
{
680-
"mcp_registry": {
681-
"url": server_url,
682-
"transport": "sse",
683-
"headers": auth_headers
612+
# Use context manager to apply httpx monkey patch
613+
async with httpx_mount_path_patch(server_url):
614+
# Initialize MCP client with the server configuration and authentication headers
615+
client = MultiServerMCPClient(
616+
{
617+
"mcp_registry": {
618+
"url": server_url,
619+
"transport": "sse",
620+
"headers": auth_headers
621+
}
684622
}
685-
}
686-
)
687-
logger.info("Connected to MCP server successfully with authentication, server_url: " + server_url)
688-
689-
# Get available tools from MCP and display them
690-
mcp_tools = await client.get_tools()
691-
logger.info(f"Available MCP tools: {[tool.name for tool in mcp_tools]}")
692-
693-
# Add the calculator and invoke_mcp_tool to the tools array
694-
# The invoke_mcp_tool function already supports authentication parameters
695-
all_tools = [calculator, invoke_mcp_tool] + mcp_tools
696-
logger.info(f"All available tools: {[tool.name if hasattr(tool, 'name') else tool.__name__ for tool in all_tools]}")
697-
698-
# Create the agent with the model and all tools
699-
agent = create_react_agent(
700-
model,
701-
all_tools
702-
)
703-
704-
# Load and format the system prompt with the current time and MCP registry URL
705-
system_prompt_template = load_system_prompt()
706-
707-
# Prepare authentication parameters for system prompt
708-
if args.use_session_cookie:
709-
system_prompt = system_prompt_template.format(
710-
current_utc_time=current_utc_time,
711-
mcp_registry_url=args.mcp_registry_url,
712-
auth_token='', # Not used for session cookie auth
713-
user_pool_id=args.user_pool_id or '',
714-
client_id=args.client_id or '',
715-
region=args.region or 'us-east-1',
716-
auth_method=auth_method,
717-
session_cookie=session_cookie
718623
)
719-
else:
720-
system_prompt = system_prompt_template.format(
721-
current_utc_time=current_utc_time,
722-
mcp_registry_url=args.mcp_registry_url,
723-
auth_token=access_token,
724-
user_pool_id=args.user_pool_id,
725-
client_id=args.client_id,
726-
region=args.region,
727-
auth_method=auth_method,
728-
session_cookie='' # Not used for M2M auth
624+
logger.info("Connected to MCP server successfully with authentication, server_url: " + server_url)
625+
626+
# Get available tools from MCP and display them
627+
mcp_tools = await client.get_tools()
628+
logger.info(f"Available MCP tools: {[tool.name for tool in mcp_tools]}")
629+
630+
# Add the calculator and invoke_mcp_tool to the tools array
631+
# The invoke_mcp_tool function already supports authentication parameters
632+
all_tools = [calculator, invoke_mcp_tool] + mcp_tools
633+
logger.info(f"All available tools: {[tool.name if hasattr(tool, 'name') else tool.__name__ for tool in all_tools]}")
634+
635+
# Create the agent with the model and all tools
636+
agent = create_react_agent(
637+
model,
638+
all_tools
729639
)
730-
731-
# Format the message with system message first
732-
formatted_messages = [
733-
{"role": "system", "content": system_prompt},
734-
{"role": "user", "content": args.message}
735-
]
736-
737-
logger.info("\nInvoking agent...\n" + "-"*40)
738-
739-
# Invoke the agent with the formatted messages
740-
response = await agent.ainvoke({"messages": formatted_messages})
741-
742-
logger.info("\nResponse:" + "\n" + "-"*40)
743-
#print(response)
744-
print_agent_response(response)
745-
746-
# Process and display the response
747-
if response and "messages" in response and response["messages"]:
748-
# Get the last message from the response
749-
last_message = response["messages"][-1]
750640

751-
if isinstance(last_message, dict) and "content" in last_message:
752-
# Display the content of the response
753-
print(last_message["content"])
641+
# Load and format the system prompt with the current time and MCP registry URL
642+
system_prompt_template = load_system_prompt()
643+
644+
# Prepare authentication parameters for system prompt
645+
if args.use_session_cookie:
646+
system_prompt = system_prompt_template.format(
647+
current_utc_time=current_utc_time,
648+
mcp_registry_url=args.mcp_registry_url,
649+
auth_token='', # Not used for session cookie auth
650+
user_pool_id=args.user_pool_id or '',
651+
client_id=args.client_id or '',
652+
region=args.region or 'us-east-1',
653+
auth_method=auth_method,
654+
session_cookie=session_cookie
655+
)
754656
else:
755-
print(str(last_message.content))
756-
else:
757-
print("No valid response received")
657+
system_prompt = system_prompt_template.format(
658+
current_utc_time=current_utc_time,
659+
mcp_registry_url=args.mcp_registry_url,
660+
auth_token=access_token,
661+
user_pool_id=args.user_pool_id,
662+
client_id=args.client_id,
663+
region=args.region,
664+
auth_method=auth_method,
665+
session_cookie='' # Not used for M2M auth
666+
)
667+
668+
# Format the message with system message first
669+
formatted_messages = [
670+
{"role": "system", "content": system_prompt},
671+
{"role": "user", "content": args.message}
672+
]
673+
674+
logger.info("\nInvoking agent...\n" + "-"*40)
758675

759-
finally:
760-
# Restore original httpx behavior
761-
httpx.AsyncClient.request = original_request
762-
logger.info("Restored original httpx behavior")
676+
# Invoke the agent with the formatted messages
677+
response = await agent.ainvoke({"messages": formatted_messages})
678+
679+
logger.info("\nResponse:" + "\n" + "-"*40)
680+
#print(response)
681+
print_agent_response(response)
682+
683+
# Process and display the response
684+
if response and "messages" in response and response["messages"]:
685+
# Get the last message from the response
686+
last_message = response["messages"][-1]
687+
688+
if isinstance(last_message, dict) and "content" in last_message:
689+
# Display the content of the response
690+
print(last_message["content"])
691+
else:
692+
print(str(last_message.content))
693+
else:
694+
print("No valid response received")
763695

764696
except Exception as e:
765-
# Restore original httpx behavior in case of error
766-
try:
767-
httpx.AsyncClient.request = original_request
768-
except NameError:
769-
pass # original_request might not be defined if error occurred before monkey patch
770697
print(f"Error: {str(e)}")
771698
import traceback
772699
print(traceback.format_exc())

0 commit comments

Comments
 (0)