Skip to content

fix: Add an exception handling function to WebFluxSseClientTransport #223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.io.IOException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;

import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -123,6 +124,16 @@ public class WebFluxSseClientTransport implements McpClientTransport {
*/
private String sseEndpoint;

/**
* Handle exceptions that occur during the processing of SSE events to avoid
* connection interruption.
*/
private BiFunction<Throwable, ServerSentEvent<String>, Mono<? extends JSONRPCMessage>> sseErrorHandler = (error,
event) -> {
logger.warn("Failed to handle SSE event {}", event, error);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log will look like this:
image

return Mono.empty();
};

/**
* Constructs a new SseClientTransport with the specified WebClient builder. Uses a
* default ObjectMapper instance for JSON processing.
Expand Down Expand Up @@ -215,12 +226,26 @@ else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
else {
s.error(new McpError("Received unrecognized SSE event type: " + event.event()));
}
}).transform(handler)).subscribe();
}).onErrorResume(e -> sseErrorHandler.apply(e, event)).transform(handler)).subscribe();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection will not be closed by an McpError exception:
image


// The connection is established once the server sends the endpoint event
return messageEndpointSink.asMono().then();
}

/**
* Sets the handler for processing transport-level errors.
*
* <p>
* The provided handler will be called when errors occur during transport operations,
* such as connection failures or protocol violations.
* </p>
* @param errorHandler a consumer that processes error messages
*/
public void setSseErrorHandler(
BiFunction<Throwable, ServerSentEvent<String>, Mono<? extends JSONRPCMessage>> errorHandler) {
this.sseErrorHandler = errorHandler;
}

/**
* Sends a JSON-RPC message to the server using the endpoint provided during
* connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -86,6 +89,11 @@ public void simulateMessageEvent(String jsonMessage) {
inboundMessageCount.incrementAndGet();
}

public void simulateComment(String comment) {
events.tryEmitNext(ServerSentEvent.<String>builder().comment(comment).build());
inboundMessageCount.incrementAndGet();
}

}

void startContainer() {
Expand Down Expand Up @@ -338,4 +346,26 @@ void testMessageOrderPreservation() {
assertThat(transport.getInboundMessageCount()).isEqualTo(3);
}

@Test
void customErrorHandlerShouldProcessErrors() throws InterruptedException {
AtomicReference<ServerSentEvent<String>> receivedErrorEvent = new AtomicReference<>();
AtomicReference<Throwable> handledError = new AtomicReference<>();

transport.setSseErrorHandler((error, event) -> {
receivedErrorEvent.set(event);
handledError.set(error);
return Mono.empty();
});

// Mock receive a common message `: This is a comment.\n\n`
transport.simulateComment("This is a comment.");

assertThat(receivedErrorEvent.get().comment()).isNotNull().isEqualTo("This is a comment.");

// Mock receive a common message `:ping - 2025-05-06 08:42:06.508759+00:00\n\n`
transport.simulateComment("ping - 2025-05-06 08:42:06.508759+00:00");

assertThat(receivedErrorEvent.get().comment()).isNotNull().isEqualTo("ping - 2025-05-06 08:42:06.508759+00:00");
}

}