diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java index 99cf2a625..7ff288011 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java @@ -93,8 +93,8 @@ public class HttpClientSseClientTransport implements McpClientTransport { /** Flag indicating if the transport is in closing state */ private volatile boolean isClosing = false; - /** Latch for coordinating endpoint discovery */ - private final CountDownLatch closeLatch = new CountDownLatch(1); + /** Future for coordinating endpoint discovery */ + private final CompletableFuture readyFuture = new CompletableFuture<>(); /** Holds the discovered message endpoint URL */ private final AtomicReference messageEndpoint = new AtomicReference<>(); @@ -353,7 +353,7 @@ public void onEvent(SseEvent event) { if (ENDPOINT_EVENT_TYPE.equals(event.type())) { String endpoint = event.data(); messageEndpoint.set(endpoint); - closeLatch.countDown(); + readyFuture.complete(null); future.complete(null); } else if (MESSAGE_EVENT_TYPE.equals(event.type())) { @@ -376,6 +376,7 @@ public void onError(Throwable error) { logger.error("SSE connection error", error); future.completeExceptionally(error); } + readyFuture.cancel(true); } }); @@ -399,11 +400,8 @@ public Mono sendMessage(JSONRPCMessage message) { } try { - if (!closeLatch.await(10, TimeUnit.SECONDS)) { - return Mono.error(new McpError("Failed to wait for the message endpoint")); - } - } - catch (InterruptedException e) { + readyFuture.get(10, TimeUnit.SECONDS); + } catch (Exception e) { return Mono.error(new McpError("Failed to wait for the message endpoint")); }