Skip to content

In Streamable HTTP transport, the sendNotification calls are getting timed out. #396

@bharat-avn-14416

Description

@bharat-avn-14416

Bug description

When sending a JSON-RPC notification using the MCP Java SDK's sendNotification method (with the Streamable HTTP transport), a timeout occurs if the server does not return any response body or headers. This happens even though notifications should not expect a response from the server. As a result, the returned Mono never completes, causing the client to hang or timeout.

Environment
MCP Java SDK version: Built the JAR on top of July 4 commit.
Java version: 17
OS: Windows

Steps to reproduce

  1. Use the MCP Java SDK with the Streamable HTTP transport.
  2. Call client.initialize() to send initialization message through which a notifications/initialized notification will be sent to a MCP server that does not return a response for notifications (as per spec).
  3. Observe that a timeout occurs and the Mono returned by sendNotification never completes.
    Expected behavior
  4. The notification should be sent and the returned Mono should complete successfully, even if the server does not return any response. No timeout should occur.

Minimal Complete Reproducible example

McpClientTransport transport;
		String url = "http://localhost:6009";
		HttpClientStreamableHttpTransport.Builder httpBuilder = HttpClientStreamableHttpTransport.builder(url);
		httpBuilder.endpoint("/mcp/");
		transport = httpBuilder.build();

		McpSyncClient client = McpClient.sync(transport)
				.requestTimeout(Duration.ofSeconds(20)) // Set a reasonable request timeout
				.initializationTimeout(Duration.ofSeconds(21)) // Set a reasonable initialization timeout
				.build();

		try {
			// Initialize the connection
			client.initialize();

			McpSchema.ListToolsResult tools = client.listTools();
			logger.log(Level.INFO, "Available tools: {0}", tools.tools());

		} catch (Exception e) {
			logger.log(Level.SEVERE, "Failed to initialize MCP client from DebugServlet", e);
		}

Workaround I did to fix it (which may not be proper):

public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sendMessage) {
		logger.warning("DEBUG:: sendMessage called with message: " + sendMessage);
		return Mono.create(messageSink -> {
			logger.log(Level.INFO, "Sending message {}", sendMessage);

			final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
			final McpTransportSession<Disposable> transportSession = this.activeSession.get();

			HttpRequest.Builder requestBuilder = this.requestBuilder.copy();

			if (transportSession != null && transportSession.sessionId().isPresent()) {
				requestBuilder = requestBuilder.header("mcp-session-id", transportSession.sessionId().get());
			}

			String jsonBody = this.toString(sendMessage);

			HttpRequest request = requestBuilder.uri(Utils.resolveUri(this.baseUri, this.endpoint))
				.header("Accept", TEXT_EVENT_STREAM + ", " + APPLICATION_JSON)
				.header("Content-Type", APPLICATION_JSON)
				.header("Cache-Control", "no-cache")
				.POST(HttpRequest.BodyPublishers.ofString(jsonBody))
				.build();

			Disposable connection;
			if (sendMessage instanceof McpSchema.JSONRPCNotification) {
				logger.log(Level.WARNING, "DEBUG::: Sending notification message, no response expected");
				// For notifications, complete the Mono immediately after sending the request
				Mono.fromFuture(this.httpClient.sendAsync(request, this.toSendMessageBodySubscriber(null)))
						.doOnSuccess(response -> messageSink.success())
						.doOnError(messageSink::error)
						.subscribe();
				connection = null;
			} else {
				connection = Flux.<ResponseEvent>create(responseEventSink -> {

							// Create the async request with proper body subscriber selection
							Mono.fromFuture(this.httpClient.sendAsync(request, this.toSendMessageBodySubscriber(responseEventSink))
									.whenComplete((response, throwable) -> {
										if (throwable != null) {
											responseEventSink.error(throwable);
										}
										else {
											logger.log(Level.FINE, "SSE connection established successfully");
										}
									})).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe();

						}).flatMap(responseEvent -> {
							if (transportSession.markInitialized(
									responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) {
								// Once we have a session, we try to open an async stream for
								// the server to send notifications and requests out-of-band.

								reconnect(null).contextWrite(messageSink.contextView()).subscribe();
							}

							String sessionRepresentation = sessionIdOrPlaceholder(transportSession);

							int statusCode = responseEvent.responseInfo().statusCode();

							if (statusCode >= 200 && statusCode < 300) {

								String contentType = responseEvent.responseInfo()
										.headers()
										.firstValue("Content-Type")
										.orElse("")
										.toLowerCase();

								if (contentType.isBlank()) {
									logger.log(Level.INFO, "No content type returned for POST in session {}", sessionRepresentation);
									return Flux.empty();
								}
								else if (contentType.contains(TEXT_EVENT_STREAM)) {
									return Flux.just(((ResponseSubscribers.SseResponseEvent) responseEvent).sseEvent())
											.flatMap(sseEvent -> {
												try {
													// We don't support batching ATM and probably won't
													// since the
													// next version considers removing it.
													McpSchema.JSONRPCMessage message = McpSchema
															.deserializeJsonRpcMessage(this.objectMapper, sseEvent.data());

													Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> idWithMessages = Tuples
															.of(Optional.ofNullable(sseEvent.id()), List.of(message));

													McpTransportStream<Disposable> sessionStream = new DefaultMcpTransportStream<>(
															this.resumableStreams, this::reconnect);

													logger.log(Level.FINE, "Connected stream {}", sessionStream.streamId());

													messageSink.success();

													return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
												}
												catch (IOException ioException) {
													return Flux.<McpSchema.JSONRPCMessage>error(
															new McpError("Error parsing JSON-RPC message: " + sseEvent.data()));
												}
											});
								}
								else if (contentType.contains(APPLICATION_JSON)) {
									messageSink.success();
									String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data();
									try {
										return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data));
									}
									catch (IOException e) {
										return Mono.error(e);
									}
								}
								logger.log(Level.WARNING, "Unknown media type {0} returned for POST in session {1}", new Object[]{contentType,
										sessionRepresentation});

								return Flux.<McpSchema.JSONRPCMessage>error(
										new RuntimeException("Unknown media type returned: " + contentType));
							}
							else if (statusCode == NOT_FOUND) {
								McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
										"Session not found for session ID: " + sessionRepresentation);
								return Flux.<McpSchema.JSONRPCMessage>error(exception);
							}
							// Some implementations can return 400 when presented with a
							// session id that it doesn't know about, so we will
							// invalidate the session
							// https://github.com/modelcontextprotocol/typescript-sdk/issues/389
							else if (statusCode == BAD_REQUEST) {
								McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
										"Session not found for session ID: " + sessionRepresentation);
								return Flux.<McpSchema.JSONRPCMessage>error(exception);
							}

							return Flux.<McpSchema.JSONRPCMessage>error(
									new RuntimeException("Failed to send message: " + responseEvent));
						})
						.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
						.onErrorMap(CompletionException.class, t -> t.getCause())
						.onErrorComplete(t -> {
							// handle the error first
							this.handleException(t);
							// inform the caller of sendMessage
							messageSink.error(t);
							return true;
						})
						.doFinally(s -> {
							logger.log(Level.FINE, "SendMessage finally: {}", s);
							Disposable ref = disposableRef.getAndSet(null);
							if (ref != null) {
								transportSession.removeConnection(ref);
							}
						})
						.contextWrite(messageSink.contextView())
						.subscribe();
			}



			if (connection != null) {
				disposableRef.set(connection);
				transportSession.addConnection(connection);
			}
		});
	}

Note: Posting this here to help fix this bug early. We are eagerly waiting for the release of Streamable Http client / server support from you guys. Appreciate your efforts!

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions