-
Notifications
You must be signed in to change notification settings - Fork 563
Description
Bug description
When sending a JSON-RPC notification using the MCP Java SDK's sendNotification method (with the Streamable HTTP transport), a timeout occurs if the server does not return any response body or headers. This happens even though notifications should not expect a response from the server. As a result, the returned Mono never completes, causing the client to hang or timeout.
Environment
MCP Java SDK version: Built the JAR on top of July 4 commit.
Java version: 17
OS: Windows
Steps to reproduce
- Use the MCP Java SDK with the Streamable HTTP transport.
- Call client.initialize() to send initialization message through which a notifications/initialized notification will be sent to a MCP server that does not return a response for notifications (as per spec).
- Observe that a timeout occurs and the Mono returned by sendNotification never completes.
Expected behavior - The notification should be sent and the returned Mono should complete successfully, even if the server does not return any response. No timeout should occur.
Minimal Complete Reproducible example
McpClientTransport transport;
String url = "http://localhost:6009";
HttpClientStreamableHttpTransport.Builder httpBuilder = HttpClientStreamableHttpTransport.builder(url);
httpBuilder.endpoint("/mcp/");
transport = httpBuilder.build();
McpSyncClient client = McpClient.sync(transport)
.requestTimeout(Duration.ofSeconds(20)) // Set a reasonable request timeout
.initializationTimeout(Duration.ofSeconds(21)) // Set a reasonable initialization timeout
.build();
try {
// Initialize the connection
client.initialize();
McpSchema.ListToolsResult tools = client.listTools();
logger.log(Level.INFO, "Available tools: {0}", tools.tools());
} catch (Exception e) {
logger.log(Level.SEVERE, "Failed to initialize MCP client from DebugServlet", e);
}
Workaround I did to fix it (which may not be proper):
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sendMessage) {
logger.warning("DEBUG:: sendMessage called with message: " + sendMessage);
return Mono.create(messageSink -> {
logger.log(Level.INFO, "Sending message {}", sendMessage);
final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
final McpTransportSession<Disposable> transportSession = this.activeSession.get();
HttpRequest.Builder requestBuilder = this.requestBuilder.copy();
if (transportSession != null && transportSession.sessionId().isPresent()) {
requestBuilder = requestBuilder.header("mcp-session-id", transportSession.sessionId().get());
}
String jsonBody = this.toString(sendMessage);
HttpRequest request = requestBuilder.uri(Utils.resolveUri(this.baseUri, this.endpoint))
.header("Accept", TEXT_EVENT_STREAM + ", " + APPLICATION_JSON)
.header("Content-Type", APPLICATION_JSON)
.header("Cache-Control", "no-cache")
.POST(HttpRequest.BodyPublishers.ofString(jsonBody))
.build();
Disposable connection;
if (sendMessage instanceof McpSchema.JSONRPCNotification) {
logger.log(Level.WARNING, "DEBUG::: Sending notification message, no response expected");
// For notifications, complete the Mono immediately after sending the request
Mono.fromFuture(this.httpClient.sendAsync(request, this.toSendMessageBodySubscriber(null)))
.doOnSuccess(response -> messageSink.success())
.doOnError(messageSink::error)
.subscribe();
connection = null;
} else {
connection = Flux.<ResponseEvent>create(responseEventSink -> {
// Create the async request with proper body subscriber selection
Mono.fromFuture(this.httpClient.sendAsync(request, this.toSendMessageBodySubscriber(responseEventSink))
.whenComplete((response, throwable) -> {
if (throwable != null) {
responseEventSink.error(throwable);
}
else {
logger.log(Level.FINE, "SSE connection established successfully");
}
})).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe();
}).flatMap(responseEvent -> {
if (transportSession.markInitialized(
responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) {
// Once we have a session, we try to open an async stream for
// the server to send notifications and requests out-of-band.
reconnect(null).contextWrite(messageSink.contextView()).subscribe();
}
String sessionRepresentation = sessionIdOrPlaceholder(transportSession);
int statusCode = responseEvent.responseInfo().statusCode();
if (statusCode >= 200 && statusCode < 300) {
String contentType = responseEvent.responseInfo()
.headers()
.firstValue("Content-Type")
.orElse("")
.toLowerCase();
if (contentType.isBlank()) {
logger.log(Level.INFO, "No content type returned for POST in session {}", sessionRepresentation);
return Flux.empty();
}
else if (contentType.contains(TEXT_EVENT_STREAM)) {
return Flux.just(((ResponseSubscribers.SseResponseEvent) responseEvent).sseEvent())
.flatMap(sseEvent -> {
try {
// We don't support batching ATM and probably won't
// since the
// next version considers removing it.
McpSchema.JSONRPCMessage message = McpSchema
.deserializeJsonRpcMessage(this.objectMapper, sseEvent.data());
Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> idWithMessages = Tuples
.of(Optional.ofNullable(sseEvent.id()), List.of(message));
McpTransportStream<Disposable> sessionStream = new DefaultMcpTransportStream<>(
this.resumableStreams, this::reconnect);
logger.log(Level.FINE, "Connected stream {}", sessionStream.streamId());
messageSink.success();
return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
}
catch (IOException ioException) {
return Flux.<McpSchema.JSONRPCMessage>error(
new McpError("Error parsing JSON-RPC message: " + sseEvent.data()));
}
});
}
else if (contentType.contains(APPLICATION_JSON)) {
messageSink.success();
String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data();
try {
return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data));
}
catch (IOException e) {
return Mono.error(e);
}
}
logger.log(Level.WARNING, "Unknown media type {0} returned for POST in session {1}", new Object[]{contentType,
sessionRepresentation});
return Flux.<McpSchema.JSONRPCMessage>error(
new RuntimeException("Unknown media type returned: " + contentType));
}
else if (statusCode == NOT_FOUND) {
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
"Session not found for session ID: " + sessionRepresentation);
return Flux.<McpSchema.JSONRPCMessage>error(exception);
}
// Some implementations can return 400 when presented with a
// session id that it doesn't know about, so we will
// invalidate the session
// https://github.com/modelcontextprotocol/typescript-sdk/issues/389
else if (statusCode == BAD_REQUEST) {
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
"Session not found for session ID: " + sessionRepresentation);
return Flux.<McpSchema.JSONRPCMessage>error(exception);
}
return Flux.<McpSchema.JSONRPCMessage>error(
new RuntimeException("Failed to send message: " + responseEvent));
})
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
.onErrorMap(CompletionException.class, t -> t.getCause())
.onErrorComplete(t -> {
// handle the error first
this.handleException(t);
// inform the caller of sendMessage
messageSink.error(t);
return true;
})
.doFinally(s -> {
logger.log(Level.FINE, "SendMessage finally: {}", s);
Disposable ref = disposableRef.getAndSet(null);
if (ref != null) {
transportSession.removeConnection(ref);
}
})
.contextWrite(messageSink.contextView())
.subscribe();
}
if (connection != null) {
disposableRef.set(connection);
transportSession.addConnection(connection);
}
});
}
Note: Posting this here to help fix this bug early. We are eagerly waiting for the release of Streamable Http client / server support from you guys. Appreciate your efforts!