From b439b89bbd765ba9541bf97c538eb487f62efd09 Mon Sep 17 00:00:00 2001 From: logan Date: Thu, 19 Jun 2025 06:10:28 +0900 Subject: [PATCH] fix(client): sendMessage() should fail immediately when connect() fails sendMessage() waits on closeLatch until the message endpoint is ready. However, if the HTTP connection fails during the connect phase, sendMessage() wastes 10 seconds waiting on closeLatch. This commit replaces CountDownLatch with CompletableFuture and calls CompletableFuture.cancel() on connect failure. As a result, sendMessage() now fails immediately instead of waiting for 10 seconds. --- .../transport/HttpClientSseClientTransport.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java index 99cf2a625..7ff288011 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java @@ -93,8 +93,8 @@ public class HttpClientSseClientTransport implements McpClientTransport { /** Flag indicating if the transport is in closing state */ private volatile boolean isClosing = false; - /** Latch for coordinating endpoint discovery */ - private final CountDownLatch closeLatch = new CountDownLatch(1); + /** Future for coordinating endpoint discovery */ + private final CompletableFuture readyFuture = new CompletableFuture<>(); /** Holds the discovered message endpoint URL */ private final AtomicReference messageEndpoint = new AtomicReference<>(); @@ -353,7 +353,7 @@ public void onEvent(SseEvent event) { if (ENDPOINT_EVENT_TYPE.equals(event.type())) { String endpoint = event.data(); messageEndpoint.set(endpoint); - closeLatch.countDown(); + readyFuture.complete(null); future.complete(null); } else if (MESSAGE_EVENT_TYPE.equals(event.type())) { @@ -376,6 +376,7 @@ public void onError(Throwable error) { logger.error("SSE connection error", error); future.completeExceptionally(error); } + readyFuture.cancel(true); } }); @@ -399,11 +400,8 @@ public Mono sendMessage(JSONRPCMessage message) { } try { - if (!closeLatch.await(10, TimeUnit.SECONDS)) { - return Mono.error(new McpError("Failed to wait for the message endpoint")); - } - } - catch (InterruptedException e) { + readyFuture.get(10, TimeUnit.SECONDS); + } catch (Exception e) { return Mono.error(new McpError("Failed to wait for the message endpoint")); }