Skip to content

Add server-side Streamable HTTP transport support #330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ public static class HttpMcpServerBuilderExtensions
{
/// <summary>
/// Adds the services necessary for <see cref="M:McpEndpointRouteBuilderExtensions.MapMcp"/>
/// to handle MCP requests and sessions using the MCP HTTP Streaming transport. For more information on configuring the underlying HTTP server
/// to handle MCP requests and sessions using the MCP Streamable HTTP transport. For more information on configuring the underlying HTTP server
/// to control things like port binding custom TLS certificates, see the <see href="https://learn.microsoft.com/aspnet/core/fundamentals/minimal-apis">Minimal APIs quick reference</see>.
/// </summary>
/// <param name="builder">The builder instance.</param>
/// <param name="configureOptions">Configures options for the HTTP Streaming transport. This allows configuring per-session
/// <param name="configureOptions">Configures options for the Streamable HTTP transport. This allows configuring per-session
/// <see cref="McpServerOptions"/> and running logic before and after a session.</param>
/// <returns>The builder provided in <paramref name="builder"/>.</returns>
/// <exception cref="ArgumentNullException"><paramref name="builder"/> is <see langword="null"/>.</exception>
public static IMcpServerBuilder WithHttpTransport(this IMcpServerBuilder builder, Action<HttpServerTransportOptions>? configureOptions = null)
{
ArgumentNullException.ThrowIfNull(builder);
builder.Services.TryAddSingleton<StreamableHttpHandler>();
builder.Services.TryAddSingleton<SseHandler>();
builder.Services.AddHostedService<IdleTrackingBackgroundService>();

if (configureOptions is not null)
{
Expand Down
66 changes: 60 additions & 6 deletions src/ModelContextProtocol.AspNetCore/HttpMcpSession.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,61 @@
using ModelContextProtocol.Protocol.Transport;
using ModelContextProtocol.Server;
using System.Security.Claims;

namespace ModelContextProtocol.AspNetCore;

internal class HttpMcpSession
internal sealed class HttpMcpSession<TTransport>(string sessionId, TTransport transport, ClaimsPrincipal user, TimeProvider timeProvider) : IAsyncDisposable
where TTransport : ITransport
{
public HttpMcpSession(SseResponseStreamTransport transport, ClaimsPrincipal user)
private int _referenceCount;
private int _getRequestStarted;
private CancellationTokenSource _disposeCts = new();

public string Id { get; } = sessionId;
public TTransport Transport { get; } = transport;
public (string Type, string Value, string Issuer)? UserIdClaim { get; } = GetUserIdClaim(user);

public CancellationToken SessionClosed => _disposeCts.Token;

public bool IsActive => !SessionClosed.IsCancellationRequested && _referenceCount > 0;
public long LastActivityTicks { get; private set; } = timeProvider.GetTimestamp();

public IMcpServer? Server { get; set; }
public Task? ServerRunTask { get; set; }

public IDisposable AcquireReference()
{
Transport = transport;
UserIdClaim = GetUserIdClaim(user);
Interlocked.Increment(ref _referenceCount);
return new UnreferenceDisposable(this, timeProvider);
}

public SseResponseStreamTransport Transport { get; }
public (string Type, string Value, string Issuer)? UserIdClaim { get; }
public bool TryStartGetRequest() => Interlocked.Exchange(ref _getRequestStarted, 1) == 0;

public async ValueTask DisposeAsync()
{
try
{
await _disposeCts.CancelAsync();

if (ServerRunTask is not null)
{
await ServerRunTask;
}
}
catch (OperationCanceledException)
{
}
finally
{
if (Server is not null)
{
await Server.DisposeAsync();
}

await Transport.DisposeAsync();
_disposeCts.Dispose();
}
}

public bool HasSameUserId(ClaimsPrincipal user)
=> UserIdClaim == GetUserIdClaim(user);
Expand All @@ -36,4 +79,15 @@ private static (string Type, string Value, string Issuer)? GetUserIdClaim(Claims

return null;
}

private sealed class UnreferenceDisposable(HttpMcpSession<TTransport> session, TimeProvider timeProvider) : IDisposable
{
public void Dispose()
{
if (Interlocked.Decrement(ref session._referenceCount) == 0)
{
session.LastActivityTicks = timeProvider.GetTimestamp();
}
}
}
}
13 changes: 13 additions & 0 deletions src/ModelContextProtocol.AspNetCore/HttpServerTransportOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,17 @@ public class HttpServerTransportOptions
/// This is useful for running logic before a sessions starts and after it completes.
/// </summary>
public Func<HttpContext, IMcpServer, CancellationToken, Task>? RunSessionHandler { get; set; }

/// <summary>
/// Represents the duration of time the server will wait between any active requests before timing out an
/// MCP session. This is checked in background every 5 seconds. A client trying to resume a session will
/// receive a 404 status code and should restart their session. A client can keep their session open by
/// keeping a GET request open. The default value is set to 2 minutes.
/// </summary>
public TimeSpan IdleTimeout { get; set; } = TimeSpan.FromMinutes(2);

/// <summary>
/// Used for testing the <see cref="IdleTimeout"/>.
/// </summary>
public TimeProvider TimeProvider { get; set; } = TimeProvider.System;
}
105 changes: 105 additions & 0 deletions src/ModelContextProtocol.AspNetCore/IdleTrackingBackgroundService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ModelContextProtocol.Protocol.Transport;

namespace ModelContextProtocol.AspNetCore;

internal sealed partial class IdleTrackingBackgroundService(
StreamableHttpHandler handler,
IOptions<HttpServerTransportOptions> options,
ILogger<IdleTrackingBackgroundService> logger) : BackgroundService
{
// The compiler will complain about the parameter being unused otherwise despite the source generator.
private ILogger _logger = logger;

// We can make this configurable once we properly harden the MCP server. In the meantime, anyone running
// this should be taking a cattle not pets approach to their servers and be able to launch more processes
// to handle more than 10,000 idle sessions at a time.
private const int MaxIdleSessionCount = 10_000;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var timeProvider = options.Value.TimeProvider;
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(5), timeProvider);

try
{
while (!stoppingToken.IsCancellationRequested && await timer.WaitForNextTickAsync(stoppingToken))
{
var idleActivityCutoff = timeProvider.GetTimestamp() - options.Value.IdleTimeout.Ticks;

var idleCount = 0;
foreach (var (_, session) in handler.Sessions)
{
if (session.IsActive || session.SessionClosed.IsCancellationRequested)
{
// There's a request currently active or the session is already being closed.
continue;
}

idleCount++;
if (idleCount == MaxIdleSessionCount)
{
// Emit critical log at most once every 5 seconds the idle count it exceeded,
//since the IdleTimeout will no longer be respected.
LogMaxSessionIdleCountExceeded();
}
else if (idleCount < MaxIdleSessionCount && session.LastActivityTicks > idleActivityCutoff)
{
continue;
}

if (handler.Sessions.TryRemove(session.Id, out var removedSession))
{
LogSessionIdle(removedSession.Id);

// Don't slow down the idle tracking loop. DisposeSessionAsync logs. We only await during graceful shutdown.
_ = DisposeSessionAsync(removedSession);
}
}
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
}
finally
{
if (stoppingToken.IsCancellationRequested)
{
List<Task> disposeSessionTasks = [];

foreach (var (sessionKey, _) in handler.Sessions)
{
if (handler.Sessions.TryRemove(sessionKey, out var session))
{
disposeSessionTasks.Add(DisposeSessionAsync(session));
}
}

await Task.WhenAll(disposeSessionTasks);
}
}
}

private async Task DisposeSessionAsync(HttpMcpSession<StreamableHttpServerTransport> session)
{
try
{
await session.DisposeAsync();
}
catch (Exception ex)
{
LogSessionDisposeError(session.Id, ex);
}
}

[LoggerMessage(Level = LogLevel.Information, Message = "Closing idle session {sessionId}.")]
private partial void LogSessionIdle(string sessionId);

[LoggerMessage(Level = LogLevel.Critical, Message = "Exceeded static maximum of 10,000 idle connections. Now clearing all inactive connections regardless of timeout.")]
private partial void LogMaxSessionIdleCountExceeded();

[LoggerMessage(Level = LogLevel.Error, Message = "Error disposing the IMcpServer for session {sessionId}.")]
private partial void LogSessionDisposeError(string sessionId, Exception ex);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using Microsoft.AspNetCore.Routing;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Metadata;
using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.DependencyInjection;
using ModelContextProtocol.AspNetCore;
using ModelContextProtocol.Protocol.Messages;
using System.Diagnostics.CodeAnalysis;

namespace Microsoft.AspNetCore.Builder;
Expand All @@ -11,21 +14,42 @@ namespace Microsoft.AspNetCore.Builder;
public static class McpEndpointRouteBuilderExtensions
{
/// <summary>
/// Sets up endpoints for handling MCP HTTP Streaming transport.
/// See <see href="https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">the protocol specification</see> for details about the Streamable HTTP transport.
/// Sets up endpoints for handling MCP Streamable HTTP transport.
/// See <see href="https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">the 2025-03-26 protocol specification</see> for details about the Streamable HTTP transport.
/// Also maps legacy SSE endpoints for backward compatibility at the path "/sse" and "/message". <see href="https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse">the 2024-11-05 protocol specification</see> for details about the HTTP with SSE transport.
/// </summary>
/// <param name="endpoints">The web application to attach MCP HTTP endpoints.</param>
/// <param name="pattern">The route pattern prefix to map to.</param>
/// <returns>Returns a builder for configuring additional endpoint conventions like authorization policies.</returns>
public static IEndpointConventionBuilder MapMcp(this IEndpointRouteBuilder endpoints, [StringSyntax("Route")] string pattern = "")
{
var handler = endpoints.ServiceProvider.GetService<StreamableHttpHandler>() ??
var streamableHttpHandler = endpoints.ServiceProvider.GetService<StreamableHttpHandler>() ??
throw new InvalidOperationException("You must call WithHttpTransport(). Unable to find required services. Call builder.Services.AddMcpServer().WithHttpTransport() in application startup code.");

var routeGroup = endpoints.MapGroup(pattern);
routeGroup.MapGet("", handler.HandleRequestAsync);
routeGroup.MapGet("/sse", handler.HandleRequestAsync);
routeGroup.MapPost("/message", handler.HandleRequestAsync);
return routeGroup;
var mcpGroup = endpoints.MapGroup(pattern);
var streamableHttpGroup = mcpGroup.MapGroup("")
.WithDisplayName(b => $"MCP Streamable HTTP | {b.DisplayName}")
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status404NotFound, typeof(JsonRpcError), contentTypes: ["application/json"]));

streamableHttpGroup.MapPost("", streamableHttpHandler.HandlePostRequestAsync)
.WithMetadata(new AcceptsMetadata(["application/json"]))
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]))
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status202Accepted));
streamableHttpGroup.MapGet("", streamableHttpHandler.HandleGetRequestAsync)
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]));
streamableHttpGroup.MapDelete("", streamableHttpHandler.HandleDeleteRequestAsync);

// Map legacy HTTP with SSE endpoints.
var sseHandler = endpoints.ServiceProvider.GetRequiredService<SseHandler>();
var sseGroup = mcpGroup.MapGroup("")
.WithDisplayName(b => $"MCP HTTP with SSE | {b.DisplayName}");

sseGroup.MapGet("/sse", sseHandler.HandleSseRequestAsync)
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]));
sseGroup.MapPost("/message", sseHandler.HandleMessageRequestAsync)
.WithMetadata(new AcceptsMetadata(["application/json"]))
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status202Accepted));

return mcpGroup;
}
}
Loading