|
35 | 35 | import java.util.function.Function;
|
36 | 36 | import java.util.function.Supplier;
|
37 | 37 |
|
| 38 | +import static io.modelcontextprotocol.spec.McpSchema.Headers.MCP_SESSION_ID; |
| 39 | + |
38 | 40 | /**
|
39 | 41 | * An implementation of the Streamable HTTP protocol as defined by the
|
40 | 42 | * <code>2025-03-26</code> version of the MCP specification.
|
@@ -129,7 +131,7 @@ private DefaultMcpTransportSession createTransportSession() {
|
129 | 131 | DefaultMcpTransportSession transportSession = this.activeSession.get();
|
130 | 132 | return transportSession.sessionId().isEmpty() ? Mono.empty()
|
131 | 133 | : webClient.delete().uri(this.endpoint).headers(httpHeaders -> {
|
132 |
| - httpHeaders.add("mcp-session-id", transportSession.sessionId().get()); |
| 134 | + httpHeaders.add(MCP_SESSION_ID, transportSession.sessionId().get()); |
133 | 135 | }).retrieve().toBodilessEntity().doOnError(e -> logger.info("Got response {}", e)).then();
|
134 | 136 | };
|
135 | 137 | return new DefaultMcpTransportSession(onClose);
|
@@ -185,7 +187,7 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
|
185 | 187 | .uri(this.endpoint)
|
186 | 188 | .accept(MediaType.TEXT_EVENT_STREAM)
|
187 | 189 | .headers(httpHeaders -> {
|
188 |
| - transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id)); |
| 190 | + transportSession.sessionId().ifPresent(id -> httpHeaders.add(MCP_SESSION_ID, id)); |
189 | 191 | if (stream != null) {
|
190 | 192 | stream.lastId().ifPresent(id -> httpHeaders.add("last-event-id", id));
|
191 | 193 | }
|
@@ -243,12 +245,11 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
|
243 | 245 | .uri(this.endpoint)
|
244 | 246 | .accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_JSON)
|
245 | 247 | .headers(httpHeaders -> {
|
246 |
| - transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id)); |
| 248 | + transportSession.sessionId().ifPresent(id -> httpHeaders.add(MCP_SESSION_ID, id)); |
247 | 249 | })
|
248 | 250 | .bodyValue(message)
|
249 | 251 | .exchangeToFlux(response -> {
|
250 |
| - if (transportSession |
251 |
| - .markInitialized(response.headers().asHttpHeaders().getFirst("mcp-session-id"))) { |
| 252 | + if (transportSession.markInitialized(response.headers().asHttpHeaders().getFirst(MCP_SESSION_ID))) { |
252 | 253 | // Once we have a session, we try to open an async stream for
|
253 | 254 | // the server to send notifications and requests out-of-band.
|
254 | 255 | reconnect(null).contextWrite(sink.contextView()).subscribe();
|
|
0 commit comments