Skip to content

Commit 09d47b2

Browse files
author
Zachary German
committed
Adding Session-owned event histories and stream replayability via GET with Last-Event-Id header
1 parent 2a97519 commit 09d47b2

File tree

9 files changed

+119
-54
lines changed

9 files changed

+119
-54
lines changed

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/transport/StreamableHttpTransportIntegrationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.modelcontextprotocol.server.transport;
66

77
import java.time.Duration;
8+
import java.util.ArrayList;
89
import java.util.Map;
910
import java.util.List;
1011
import java.util.concurrent.CountDownLatch;

mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServerExchange.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import io.modelcontextprotocol.spec.McpSchema;
1313
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
1414
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
15-
import io.modelcontextprotocol.spec.McpSession;
1615
import io.modelcontextprotocol.spec.McpServerSession;
1716
import io.modelcontextprotocol.util.Assert;
1817
import reactor.core.publisher.Mono;
@@ -26,7 +25,7 @@
2625
*/
2726
public class McpAsyncServerExchange {
2827

29-
private final McpSession session;
28+
private final McpServerSession session;
3029

3130
private final McpSchema.ClientCapabilities clientCapabilities;
3231

mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncStreamableHttpServer.java

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import io.modelcontextprotocol.server.transport.StreamableHttpServerTransportProvider;
1818
import io.modelcontextprotocol.spec.McpServerSession;
19+
import io.modelcontextprotocol.spec.McpServerTransportProvider;
1920
import io.modelcontextprotocol.spec.McpSchema;
2021
import io.modelcontextprotocol.util.Assert;
2122
import io.modelcontextprotocol.util.DeafaultMcpUriTemplateManagerFactory;
@@ -55,11 +56,6 @@
5556
* suitable for high-throughput scenarios and reactive applications. All operations return
5657
* Mono or Flux types that can be composed into reactive pipelines.
5758
*
58-
* <p>
59-
* The server supports runtime modification of its capabilities through methods like
60-
* {@link #addTool}, {@link #addResource}, and {@link #addPrompt}, automatically notifying
61-
* connected clients of changes when configured to do so.
62-
*
6359
* @author Christian Tzolov
6460
* @author Dariusz Jędrzejczyk
6561
* @author Jihoon Kim
@@ -106,6 +102,10 @@ public class McpAsyncStreamableHttpServer {
106102
setupSessionFactory();
107103
}
108104

105+
public McpServerTransportProvider getTransportProvider() {
106+
return this.httpTransportProvider;
107+
}
108+
109109
/**
110110
* Sets up the request handlers for standard MCP methods.
111111
*/
@@ -438,20 +438,7 @@ public static Builder builder() {
438438
}
439439

440440
/**
441-
* Builder for creating instances of McpAsyncStreamableHttpServer with Streamable HTTP
442-
* transport.
443-
*
444-
* <p>
445-
* This builder provides a fluent API for configuring Streamable HTTP MCP servers with
446-
* enhanced features:
447-
* <ul>
448-
* <li>Single session class managing all transport streams</li>
449-
* <li>Resource management and lifecycle handling</li>
450-
* <li>Clean separation between session and transport concerns</li>
451-
* <li>Support for both immediate and streaming responses</li>
452-
* </ul>
453-
*
454-
* @author Zachary German
441+
* Builder for creating instances of McpAsyncServer
455442
*/
456443
public static class Builder {
457444

mcp/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,8 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
215215
PrintWriter writer = response.getWriter();
216216

217217
// Create a new session transport
218-
HttpServletMcpSessionTransport sessionTransport = new HttpServletMcpSessionTransport(
219-
McpServerSession.LISTENING_TRANSPORT, asyncContext, writer);
218+
HttpServletMcpSessionTransport sessionTransport = new HttpServletMcpSessionTransport(sessionId, asyncContext,
219+
writer);
220220

221221
// Create a new session using the session factory
222222
McpServerSession session = sessionFactory.create(sessionTransport);

mcp/src/main/java/io/modelcontextprotocol/server/transport/StreamableHttpServerTransportProvider.java

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.concurrent.atomic.AtomicBoolean;
1818
import java.util.concurrent.atomic.AtomicLong;
1919
import java.util.function.Supplier;
20+
import java.util.stream.Collectors;
2021

2122
import com.fasterxml.jackson.core.type.TypeReference;
2223
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -30,6 +31,7 @@
3031
import io.modelcontextprotocol.spec.McpServerSession;
3132
import io.modelcontextprotocol.spec.McpServerTransport;
3233
import io.modelcontextprotocol.spec.McpServerTransportProvider;
34+
import io.modelcontextprotocol.spec.SseEvent;
3335
import io.modelcontextprotocol.util.Assert;
3436
import jakarta.servlet.AsyncContext;
3537
import jakarta.servlet.ReadListener;
@@ -89,6 +91,8 @@ public class StreamableHttpServerTransportProvider extends HttpServlet implement
8991

9092
public static final String ALLOW_ORIGIN_DEFAULT_VALUE = "*";
9193

94+
public static final String PROTOCOL_VERSION_HEADER = "MCP-Protocol-Version";
95+
9296
public static final String CACHE_CONTROL_HEADER = "Cache-Control";
9397

9498
public static final String CONNECTION_HEADER = "Connection";
@@ -117,7 +121,7 @@ public class StreamableHttpServerTransportProvider extends HttpServlet implement
117121
private final Supplier<String> sessionIdProvider;
118122

119123
/** Sessions map, keyed by Session ID */
120-
private final Map<String, McpServerSession> sessions = new ConcurrentHashMap<>();
124+
private static final Map<String, McpServerSession> sessions = new ConcurrentHashMap<>();
121125

122126
/** Flag indicating if the transport is in the process of shutting down */
123127
private final AtomicBoolean isClosing = new AtomicBoolean(false);
@@ -128,6 +132,7 @@ public class StreamableHttpServerTransportProvider extends HttpServlet implement
128132
/** Callback interface for session lifecycle and errors */
129133
private SessionHandler sessionHandler;
130134

135+
/** Factory for McpServerSession takes session IDs */
131136
private McpServerSession.StreamableHttpSessionFactory streamableHttpSessionFactory;
132137

133138
/**
@@ -242,6 +247,13 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
242247
return;
243248
}
244249

250+
// Delayed until version negotiation is implemented.
251+
/*
252+
* if (session.getState().equals(session.STATE_INITIALIZED) &&
253+
* request.getHeader(PROTOCOL_VERSION_HEADER) == null) {
254+
* sendErrorResponse(response, "Protocol version missing in request header"); }
255+
*/
256+
245257
// Set up SSE connection
246258
response.setContentType(TEXT_EVENT_STREAM);
247259
response.setCharacterEncoding(UTF_8);
@@ -254,10 +266,18 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
254266

255267
String lastEventId = request.getHeader(LAST_EVENT_ID_HEADER);
256268

257-
SseTransport sseTransport = new SseTransport(objectMapper, response, asyncContext, lastEventId);
258-
session.registerTransport(session.LISTENING_TRANSPORT, sseTransport);
259-
260-
logger.debug("Registered SSE transport {} for session {}", session.LISTENING_TRANSPORT, sessionId);
269+
if (lastEventId == null) { // Just opening a listening stream
270+
SseTransport sseTransport = new SseTransport(objectMapper, response, asyncContext, lastEventId,
271+
session.LISTENING_TRANSPORT, sessionId);
272+
session.registerTransport(session.LISTENING_TRANSPORT, sseTransport);
273+
logger.debug("Registered SSE transport {} for session {}", session.LISTENING_TRANSPORT, sessionId);
274+
}
275+
else { // Asking for a stream to replay events from a previous request
276+
SseTransport sseTransport = new SseTransport(objectMapper, response, asyncContext, lastEventId,
277+
request.getRequestId(), sessionId);
278+
session.registerTransport(request.getRequestId(), sseTransport);
279+
logger.debug("Registered SSE transport {} for session {}", session.LISTENING_TRANSPORT, sessionId);
280+
}
261281
}
262282

263283
@Override
@@ -328,6 +348,15 @@ public void onAllDataRead() throws IOException {
328348
asyncContext.complete();
329349
return;
330350
}
351+
352+
// Delayed until version negotiation is implemented.
353+
/*
354+
* if (session.getState().equals(session.STATE_INITIALIZED) &&
355+
* request.getHeader(PROTOCOL_VERSION_HEADER) == null) {
356+
* sendErrorResponse(response,
357+
* "Protocol version missing in request header"); }
358+
*/
359+
331360
logger.debug("Using session: {}", sessionId);
332361

333362
response.setHeader(SESSION_ID_HEADER, sessionId);
@@ -362,7 +391,8 @@ else if (id instanceof Integer) {
362391
response.setHeader(CACHE_CONTROL_HEADER, CACHE_CONTROL_NO_CACHE);
363392
response.setHeader(CONNECTION_HEADER, CONNECTION_KEEP_ALIVE);
364393

365-
SseTransport sseTransport = new SseTransport(objectMapper, response, asyncContext, null);
394+
SseTransport sseTransport = new SseTransport(objectMapper, response, asyncContext, null,
395+
transportId, sessionId);
366396
session.registerTransport(transportId, sseTransport);
367397
}
368398
else {
@@ -650,13 +680,17 @@ private static class SseTransport implements McpServerTransport {
650680

651681
private final Map<String, SseEvent> eventHistory = new ConcurrentHashMap<>();
652682

653-
private final AtomicLong eventCounter = new AtomicLong(0);
683+
private final String id;
684+
685+
private final String sessionId;
654686

655687
public SseTransport(ObjectMapper objectMapper, HttpServletResponse response, AsyncContext asyncContext,
656-
String lastEventId) {
688+
String lastEventId, String transportId, String sessionId) {
657689
this.objectMapper = objectMapper;
658690
this.response = response;
659691
this.asyncContext = asyncContext;
692+
this.id = transportId;
693+
this.sessionId = sessionId;
660694

661695
setupSseStream(lastEventId);
662696
}
@@ -710,9 +744,19 @@ private void setupSseStream(String lastEventId) {
710744

711745
private void replayEventsAfter(String lastEventId) {
712746
try {
713-
long lastId = Long.parseLong(lastEventId);
714-
for (long i = lastId + 1; i <= eventCounter.get(); i++) {
715-
SseEvent event = eventHistory.get(String.valueOf(i));
747+
McpServerSession session = sessions.get(sessionId);
748+
String transportIdOfLastEventId = session.getTransportIdForEvent(lastEventId);
749+
Map<String, SseEvent> transportEventHistory = session
750+
.getTransportEventHistory(transportIdOfLastEventId);
751+
List<String> eventIds = transportEventHistory.keySet()
752+
.stream()
753+
.map(Long::parseLong)
754+
.filter(key -> key > Long.parseLong(lastEventId))
755+
.sorted()
756+
.map(String::valueOf)
757+
.collect(Collectors.toList());
758+
for (String eventId : eventIds) {
759+
SseEvent event = transportEventHistory.get(eventId);
716760
if (event != null) {
717761
eventSink.tryEmitNext(event);
718762
}
@@ -727,7 +771,7 @@ private void replayEventsAfter(String lastEventId) {
727771
public Mono<Void> sendMessage(JSONRPCMessage message) {
728772
try {
729773
String jsonText = objectMapper.writeValueAsString(message);
730-
String eventId = String.valueOf(eventCounter.incrementAndGet());
774+
String eventId = sessions.get(sessionId).incrementAndGetEventId(id);
731775
SseEvent event = new SseEvent(eventId, MESSAGE_EVENT_TYPE, jsonText);
732776

733777
eventHistory.put(eventId, event);
@@ -737,6 +781,7 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
737781
if (message instanceof McpSchema.JSONRPCResponse) {
738782
logger.debug("Completing SSE stream after sending response");
739783
eventSink.tryEmitComplete();
784+
sessions.get(sessionId).setTransportEventHistory(id, eventHistory);
740785
}
741786

742787
return Mono.empty();
@@ -754,7 +799,7 @@ public Mono<Void> sendMessageStream(Flux<JSONRPCMessage> messageStream) {
754799
return messageStream.doOnNext(message -> {
755800
try {
756801
String jsonText = objectMapper.writeValueAsString(message);
757-
String eventId = String.valueOf(eventCounter.incrementAndGet());
802+
String eventId = sessions.get(sessionId).incrementAndGetEventId(id);
758803
SseEvent event = new SseEvent(eventId, MESSAGE_EVENT_TYPE, jsonText);
759804

760805
eventHistory.put(eventId, event);
@@ -768,6 +813,7 @@ public Mono<Void> sendMessageStream(Flux<JSONRPCMessage> messageStream) {
768813
}).doOnComplete(() -> {
769814
logger.debug("Completing SSE stream after sending all stream messages");
770815
eventSink.tryEmitComplete();
816+
sessions.get(sessionId).setTransportEventHistory(id, eventHistory);
771817
}).then();
772818
}
773819

@@ -780,13 +826,11 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
780826
public Mono<Void> closeGracefully() {
781827
return Mono.fromRunnable(() -> {
782828
eventSink.tryEmitComplete();
829+
sessions.get(sessionId).setTransportEventHistory(id, eventHistory);
783830
logger.debug("SSE transport closed gracefully");
784831
});
785832
}
786833

787-
private record SseEvent(String id, String event, String data) {
788-
}
789-
790834
}
791835

792836
/**

mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public final class McpSchema {
4040
private McpSchema() {
4141
}
4242

43+
// TODO: Changing this breaks initialization for quite a few tests (fix).
4344
public static final String LATEST_PROTOCOL_VERSION = "2024-11-05";
4445

4546
public static final String JSONRPC_VERSION = "2.0";
@@ -200,7 +201,6 @@ public sealed interface JSONRPCMessage permits JSONRPCRequest, JSONRPCNotificati
200201

201202
@JsonInclude(JsonInclude.Include.NON_ABSENT)
202203
@JsonIgnoreProperties(ignoreUnknown = true)
203-
// TODO: batching support
204204
// @JsonFormat(with = JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
205205
public record JSONRPCRequest( // @formatter:off
206206
@JsonProperty("jsonrpc") String jsonrpc,
@@ -211,7 +211,6 @@ public record JSONRPCRequest( // @formatter:off
211211

212212
@JsonInclude(JsonInclude.Include.NON_ABSENT)
213213
@JsonIgnoreProperties(ignoreUnknown = true)
214-
// TODO: batching support
215214
// @JsonFormat(with = JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
216215
public record JSONRPCNotification( // @formatter:off
217216
@JsonProperty("jsonrpc") String jsonrpc,
@@ -221,7 +220,6 @@ public record JSONRPCNotification( // @formatter:off
221220

222221
@JsonInclude(JsonInclude.Include.NON_ABSENT)
223222
@JsonIgnoreProperties(ignoreUnknown = true)
224-
// TODO: batching support
225223
// @JsonFormat(with = JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
226224
public record JSONRPCResponse( // @formatter:off
227225
@JsonProperty("jsonrpc") String jsonrpc,

0 commit comments

Comments
 (0)